summaryrefslogtreecommitdiff
path: root/simulator/opendc-workflows
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-01-07 17:25:40 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-01-07 23:33:57 +0100
commit9cf24c9a8d3e96a29d9b111081bc3369aadd490d (patch)
tree4f378ee9f77d8623a67a403135a4010afd5f9000 /simulator/opendc-workflows
parent74a4bff83bfb6366cc193d1fc9c4a07e49649649 (diff)
Refactor workflow service to schedule tasks onto VMs
This change updates the workflow service to delegate the resource scheduling logic to the virtualized resource provisioner.
Diffstat (limited to 'simulator/opendc-workflows')
-rw-r--r--simulator/opendc-workflows/build.gradle.kts2
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt72
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt4
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt36
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt41
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt45
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt45
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt32
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt5
-rw-r--r--simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt21
10 files changed, 48 insertions, 255 deletions
diff --git a/simulator/opendc-workflows/build.gradle.kts b/simulator/opendc-workflows/build.gradle.kts
index fa03508c..4346efcc 100644
--- a/simulator/opendc-workflows/build.gradle.kts
+++ b/simulator/opendc-workflows/build.gradle.kts
@@ -32,8 +32,10 @@ dependencies {
api(project(":opendc-compute:opendc-compute-core"))
api(project(":opendc-trace:opendc-trace-core"))
implementation(project(":opendc-utils"))
+ implementation("io.github.microutils:kotlin-logging:1.7.9")
testImplementation(project(":opendc-simulator:opendc-simulator-core"))
+ testImplementation(project(":opendc-compute:opendc-compute-simulator"))
testImplementation(project(":opendc-format"))
testImplementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.9.8") {
exclude("org.jetbrains.kotlin", module = "kotlin-reflect")
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
index 91657f83..2c8d9a0b 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
@@ -29,41 +29,43 @@ import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
+import mu.KotlinLogging
+import org.opendc.compute.core.Flavor
import org.opendc.compute.core.Server
import org.opendc.compute.core.ServerEvent
import org.opendc.compute.core.ServerState
-import org.opendc.compute.core.metal.Node
-import org.opendc.compute.core.metal.service.ProvisioningService
+import org.opendc.compute.core.virt.service.VirtProvisioningService
import org.opendc.trace.core.EventTracer
import org.opendc.trace.core.consumeAsFlow
import org.opendc.trace.core.enable
import org.opendc.workflows.service.stage.job.JobAdmissionPolicy
import org.opendc.workflows.service.stage.job.JobOrderPolicy
-import org.opendc.workflows.service.stage.resource.ResourceFilterPolicy
-import org.opendc.workflows.service.stage.resource.ResourceSelectionPolicy
import org.opendc.workflows.service.stage.task.TaskEligibilityPolicy
import org.opendc.workflows.service.stage.task.TaskOrderPolicy
import org.opendc.workflows.workload.Job
+import org.opendc.workflows.workload.WORKFLOW_TASK_CORES
import java.time.Clock
import java.util.*
/**
* A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for
- * Topology Scheduling.
+ * Datacenter Scheduling.
*/
public class StageWorkflowService(
internal val coroutineScope: CoroutineScope,
internal val clock: Clock,
internal val tracer: EventTracer,
- private val provisioningService: ProvisioningService,
+ private val provisioningService: VirtProvisioningService,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
jobOrderPolicy: JobOrderPolicy,
taskEligibilityPolicy: TaskEligibilityPolicy,
- taskOrderPolicy: TaskOrderPolicy,
- resourceFilterPolicy: ResourceFilterPolicy,
- resourceSelectionPolicy: ResourceSelectionPolicy
+ taskOrderPolicy: TaskOrderPolicy
) : WorkflowService {
+ /**
+ * The logger instance to use.
+ */
+ private val logger = KotlinLogging.logger {}
/**
* The incoming jobs ready to be processed by the scheduler.
@@ -101,25 +103,10 @@ public class StageWorkflowService(
internal val taskByServer = mutableMapOf<Server, TaskState>()
/**
- * The nodes that are controlled by the service.
- */
- internal lateinit var nodes: List<Node>
-
- /**
- * The available nodes.
- */
- internal val available: MutableSet<Node> = mutableSetOf()
-
- /**
- * The maximum number of incoming jobs.
- */
- private val throttleLimit: Int = 20000
-
- /**
* The load of the system.
*/
internal val load: Double
- get() = (available.size / nodes.size.toDouble())
+ get() = (activeTasks.size / provisioningService.hostCount.toDouble())
/**
* The root listener of this scheduler.
@@ -170,22 +157,13 @@ public class StageWorkflowService(
private val mode: WorkflowSchedulerMode.Logic
private val jobAdmissionPolicy: JobAdmissionPolicy.Logic
private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic
- private val resourceFilterPolicy: ResourceFilterPolicy.Logic
- private val resourceSelectionPolicy: Comparator<Node>
init {
- coroutineScope.launch {
- nodes = provisioningService.nodes().toList()
- available.addAll(nodes)
- }
-
this.mode = mode(this)
this.jobAdmissionPolicy = jobAdmissionPolicy(this)
this.jobQueue = PriorityQueue(100, jobOrderPolicy(this).thenBy { it.job.uid })
this.taskEligibilityPolicy = taskEligibilityPolicy(this)
this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid })
- this.resourceFilterPolicy = resourceFilterPolicy(this)
- this.resourceSelectionPolicy = resourceSelectionPolicy(this)
}
override val events: Flow<WorkflowEvent> = tracer.openRecording().let {
@@ -290,26 +268,25 @@ public class StageWorkflowService(
// T3 Per task
while (taskQueue.isNotEmpty()) {
val instance = taskQueue.peek()
- val host: Node? = available.firstOrNull()
- if (host != null) {
- // T4 Submit task to machine
- available -= host
+ val cores = instance.task.metadata[WORKFLOW_TASK_CORES] as? Int ?: 1
+ val flavor = Flavor(cores, 1000) // TODO How to determine memory usage for workflow task
+ val image = instance.task.image
+ coroutineScope.launch {
+ val server = provisioningService.deploy(instance.task.name, image, flavor)
+
instance.state = TaskStatus.ACTIVE
- val newHost = provisioningService.deploy(host, instance.task.image)
- val server = newHost.server!!
- instance.host = newHost
+ instance.server = server
taskByServer[server] = instance
+
server.events
.onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) }
.launchIn(coroutineScope)
-
- activeTasks += instance
- taskQueue.poll()
- rootListener.taskAssigned(instance)
- } else {
- break
}
+
+ activeTasks += instance
+ taskQueue.poll()
+ rootListener.taskAssigned(instance)
}
}
@@ -333,7 +310,6 @@ public class StageWorkflowService(
task.state = TaskStatus.FINISHED
task.finishedAt = clock.millis()
job.tasks.remove(task)
- available += task.host!!
activeTasks -= task
tracer.commit(
WorkflowEvent.TaskFinished(
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt
index ed023c82..d1eb6704 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt
@@ -22,7 +22,7 @@
package org.opendc.workflows.service
-import org.opendc.compute.core.metal.Node
+import org.opendc.compute.core.Server
import org.opendc.workflows.workload.Task
public class TaskState(public val job: JobState, public val task: Task) {
@@ -62,7 +62,7 @@ public class TaskState(public val job: JobState, public val task: Task) {
}
}
- public var host: Node? = null
+ public var server: Server? = null
/**
* Mark the specified [TaskView] as terminated.
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt
deleted file mode 100644
index 8dc323ec..00000000
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflows.service.stage.resource
-
-import org.opendc.compute.core.metal.Node
-import org.opendc.workflows.service.StageWorkflowService
-
-/**
- * A [ResourceSelectionPolicy] that selects the first machine that is available.
- */
-public object FirstFitResourceSelectionPolicy : ResourceSelectionPolicy {
- override fun invoke(scheduler: StageWorkflowService): Comparator<Node> =
- Comparator<Node> { _, _ -> 1 }
-
- override fun toString(): String = "First-Fit"
-}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt
deleted file mode 100644
index ac79a9ce..00000000
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflows.service.stage.resource
-
-import org.opendc.compute.core.metal.Node
-import org.opendc.workflows.service.StageWorkflowService
-import org.opendc.workflows.service.TaskState
-
-/**
- * A [ResourceFilterPolicy] based on the amount of cores available on the machine and the cores required for
- * the task.
- */
-public object FunctionalResourceFilterPolicy : ResourceFilterPolicy {
- override fun invoke(scheduler: StageWorkflowService): ResourceFilterPolicy.Logic =
- object : ResourceFilterPolicy.Logic {
- override fun invoke(hosts: Sequence<Node>, task: TaskState): Sequence<Node> =
- hosts.filter { it in scheduler.available }
- }
-
- override fun toString(): String = "functional"
-}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt
deleted file mode 100644
index caf87c70..00000000
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflows.service.stage.resource
-
-import org.opendc.compute.core.metal.Node
-import org.opendc.workflows.service.StageWorkflowService
-import java.util.*
-
-/**
- * A [ResourceSelectionPolicy] that randomly orders the machines.
- */
-public object RandomResourceSelectionPolicy : ResourceSelectionPolicy {
- override fun invoke(scheduler: StageWorkflowService): Comparator<Node> = object : Comparator<Node> {
- private val ids: Map<Node, Long>
-
- init {
- val random = Random(123)
- ids = scheduler.nodes.associateWith { random.nextLong() }
- }
-
- override fun compare(o1: Node, o2: Node): Int = compareValuesBy(o1, o2) { ids[it] }
- }
-
- override fun toString(): String = "Random"
-}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt
deleted file mode 100644
index 4923a34b..00000000
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflows.service.stage.resource
-
-import org.opendc.compute.core.metal.Node
-import org.opendc.workflows.service.TaskState
-import org.opendc.workflows.service.stage.StagePolicy
-
-/**
- * This interface represents stages **R2**, **R3** and **R4** stage of the Reference Architecture for Schedulers and
- * acts as a filter yielding a list of resources with sufficient resource-capacities, based on fixed or dynamic
- * requirements, and on predicted or monitored information about processing unit availability, memory occupancy, etc.
- */
-public interface ResourceFilterPolicy : StagePolicy<ResourceFilterPolicy.Logic> {
- public interface Logic {
- /**
- * Filter the list of machines based on dynamic information.
- *
- * @param hosts The hosts to filter.
- * @param task The task that is to be scheduled.
- * @return The machines on which the task can be scheduled.
- */
- public operator fun invoke(hosts: Sequence<Node>, task: TaskState): Sequence<Node>
- }
-}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt
deleted file mode 100644
index 990b990a..00000000
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright (c) 2020 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.workflows.service.stage.resource
-
-import org.opendc.compute.core.metal.Node
-import org.opendc.workflows.service.stage.StagePolicy
-
-/**
- * This interface represents the **R5** stage of the Reference Architecture for Schedulers and matches the the selected
- * task with a (set of) resource(s), using policies such as First-Fit, Worst-Fit, and Best-Fit.
- */
-public interface ResourceSelectionPolicy : StagePolicy<Comparator<Node>>
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt
index d02e2b4e..4305aa57 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt
@@ -28,3 +28,8 @@ package org.opendc.workflows.workload
* Meta-data key for the deadline of a task.
*/
public const val WORKFLOW_TASK_DEADLINE: String = "workflow:task:deadline"
+
+/**
+ * Meta-data key for the number of cores needed for a task.
+ */
+public const val WORKFLOW_TASK_CORES: String = "workflow:task:cores"
diff --git a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
index 62955a11..b97cb915 100644
--- a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
+++ b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
@@ -37,14 +37,14 @@ import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.opendc.compute.core.metal.service.ProvisioningService
+import org.opendc.compute.simulator.SimVirtProvisioningService
+import org.opendc.compute.simulator.allocation.NumberOfActiveServersAllocationPolicy
import org.opendc.format.environment.sc18.Sc18EnvironmentReader
import org.opendc.format.trace.gwf.GwfTraceReader
import org.opendc.simulator.utils.DelayControllerClockAdapter
import org.opendc.trace.core.EventTracer
import org.opendc.workflows.service.stage.job.NullJobAdmissionPolicy
import org.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy
-import org.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy
-import org.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import org.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import org.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
import kotlin.math.max
@@ -74,18 +74,26 @@ internal class StageWorkflowSchedulerIntegrationTest {
val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json"))
.use { it.construct(testScope, clock) }
+ val bareMetal = environment.platforms[0].zones[0].services[ProvisioningService]
+
+ // Wait for the bare metal nodes to be spawned
+ delay(10)
+
+ val provisioner = SimVirtProvisioningService(testScope, clock, bareMetal, NumberOfActiveServersAllocationPolicy(), tracer, schedulingQuantum = 1000)
+
+ // Wait for the hypervisors to be spawned
+ delay(10)
+
StageWorkflowService(
testScope,
clock,
tracer,
- environment.platforms[0].zones[0].services[ProvisioningService],
+ provisioner,
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,
jobOrderPolicy = SubmissionTimeJobOrderPolicy(),
taskEligibilityPolicy = NullTaskEligibilityPolicy,
taskOrderPolicy = SubmissionTimeTaskOrderPolicy(),
- resourceFilterPolicy = FunctionalResourceFilterPolicy,
- resourceSelectionPolicy = FirstFitResourceSelectionPolicy
)
}
@@ -110,7 +118,7 @@ internal class StageWorkflowSchedulerIntegrationTest {
while (reader.hasNext()) {
val (time, job) = reader.next()
jobsSubmitted++
- delay(max(0, time * 1000 - clock.millis()))
+ delay(max(0, time - clock.millis()))
scheduler.submit(job)
}
}
@@ -118,6 +126,7 @@ internal class StageWorkflowSchedulerIntegrationTest {
testScope.advanceUntilIdle()
assertAll(
+ { assertEquals(emptyList<Throwable>(), testScope.uncaughtExceptions) },
{ assertNotEquals(0, jobsSubmitted, "No jobs submitted") },
{ assertEquals(jobsSubmitted, jobsStarted, "Not all submitted jobs started") },
{ assertEquals(jobsSubmitted, jobsFinished, "Not all started jobs finished") },