diff options
Diffstat (limited to 'simulator/opendc-workflows/src')
11 files changed, 94 insertions, 282 deletions
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt index 3b4e6eab..e04c8a4c 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt @@ -26,40 +26,46 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch +import mu.KotlinLogging +import org.opendc.compute.core.Flavor import org.opendc.compute.core.Server import org.opendc.compute.core.ServerEvent import org.opendc.compute.core.ServerState -import org.opendc.compute.core.metal.Node -import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.utils.flow.EventFlow +import org.opendc.compute.core.virt.service.VirtProvisioningService +import org.opendc.trace.core.EventTracer +import org.opendc.trace.core.consumeAsFlow +import org.opendc.trace.core.enable import org.opendc.workflows.service.stage.job.JobAdmissionPolicy import org.opendc.workflows.service.stage.job.JobOrderPolicy -import org.opendc.workflows.service.stage.resource.ResourceFilterPolicy -import org.opendc.workflows.service.stage.resource.ResourceSelectionPolicy import org.opendc.workflows.service.stage.task.TaskEligibilityPolicy import org.opendc.workflows.service.stage.task.TaskOrderPolicy import org.opendc.workflows.workload.Job +import org.opendc.workflows.workload.WORKFLOW_TASK_CORES import java.time.Clock import java.util.* /** * A [WorkflowService] that distributes work through a multi-stage process based on the Reference Architecture for - * Topology Scheduling. + * Datacenter Scheduling. */ public class StageWorkflowService( internal val coroutineScope: CoroutineScope, internal val clock: Clock, - private val provisioningService: ProvisioningService, + internal val tracer: EventTracer, + private val provisioningService: VirtProvisioningService, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, jobOrderPolicy: JobOrderPolicy, taskEligibilityPolicy: TaskEligibilityPolicy, - taskOrderPolicy: TaskOrderPolicy, - resourceFilterPolicy: ResourceFilterPolicy, - resourceSelectionPolicy: ResourceSelectionPolicy + taskOrderPolicy: TaskOrderPolicy ) : WorkflowService { + /** + * The logger instance to use. + */ + private val logger = KotlinLogging.logger {} /** * The incoming jobs ready to be processed by the scheduler. @@ -97,25 +103,10 @@ public class StageWorkflowService( internal val taskByServer = mutableMapOf<Server, TaskState>() /** - * The nodes that are controlled by the service. - */ - internal lateinit var nodes: List<Node> - - /** - * The available nodes. - */ - internal val available: MutableSet<Node> = mutableSetOf() - - /** - * The maximum number of incoming jobs. - */ - private val throttleLimit: Int = 20000 - - /** * The load of the system. */ internal val load: Double - get() = (available.size / nodes.size.toDouble()) + get() = (activeTasks.size / provisioningService.hostCount.toDouble()) /** * The root listener of this scheduler. @@ -166,26 +157,23 @@ public class StageWorkflowService( private val mode: WorkflowSchedulerMode.Logic private val jobAdmissionPolicy: JobAdmissionPolicy.Logic private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic - private val resourceFilterPolicy: ResourceFilterPolicy.Logic - private val resourceSelectionPolicy: Comparator<Node> - private val eventFlow = EventFlow<WorkflowEvent>() init { - coroutineScope.launch { - nodes = provisioningService.nodes().toList() - available.addAll(nodes) - } - this.mode = mode(this) this.jobAdmissionPolicy = jobAdmissionPolicy(this) this.jobQueue = PriorityQueue(100, jobOrderPolicy(this).thenBy { it.job.uid }) this.taskEligibilityPolicy = taskEligibilityPolicy(this) this.taskQueue = PriorityQueue(1000, taskOrderPolicy(this).thenBy { it.task.uid }) - this.resourceFilterPolicy = resourceFilterPolicy(this) - this.resourceSelectionPolicy = resourceSelectionPolicy(this) } - override val events: Flow<WorkflowEvent> = eventFlow + override val events: Flow<WorkflowEvent> = tracer.openRecording().let { + it.enable<WorkflowEvent.JobSubmitted>() + it.enable<WorkflowEvent.JobStarted>() + it.enable<WorkflowEvent.JobFinished>() + it.enable<WorkflowEvent.TaskStarted>() + it.enable<WorkflowEvent.TaskFinished>() + it.consumeAsFlow().map { event -> event as WorkflowEvent } + } override suspend fun submit(job: Job) { // J1 Incoming Jobs @@ -209,6 +197,7 @@ public class StageWorkflowService( instances.values.toCollection(jobInstance.tasks) incomingJobs += jobInstance rootListener.jobSubmitted(jobInstance) + tracer.commit(WorkflowEvent.JobSubmitted(this, jobInstance.job)) requestCycle() } @@ -237,7 +226,7 @@ public class StageWorkflowService( iterator.remove() jobQueue.add(jobInstance) activeJobs += jobInstance - eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, clock.millis())) + tracer.commit(WorkflowEvent.JobStarted(this, jobInstance.job)) rootListener.jobStarted(jobInstance) } @@ -279,26 +268,25 @@ public class StageWorkflowService( // T3 Per task while (taskQueue.isNotEmpty()) { val instance = taskQueue.peek() - val host: Node? = available.firstOrNull() - if (host != null) { - // T4 Submit task to machine - available -= host + val cores = instance.task.metadata[WORKFLOW_TASK_CORES] as? Int ?: 1 + val flavor = Flavor(cores, 1000) // TODO How to determine memory usage for workflow task + val image = instance.task.image + coroutineScope.launch { + val server = provisioningService.deploy(instance.task.name, image, flavor) + instance.state = TaskStatus.ACTIVE - val newHost = provisioningService.deploy(host, instance.task.image) - val server = newHost.server!! - instance.host = newHost + instance.server = server taskByServer[server] = instance + server.events .onEach { event -> if (event is ServerEvent.StateChanged) stateChanged(event.server) } .launchIn(coroutineScope) - - activeTasks += instance - taskQueue.poll() - rootListener.taskAssigned(instance) - } else { - break } + + activeTasks += instance + taskQueue.poll() + rootListener.taskAssigned(instance) } } @@ -307,12 +295,11 @@ public class StageWorkflowService( ServerState.ACTIVE -> { val task = taskByServer.getValue(server) task.startedAt = clock.millis() - eventFlow.emit( + tracer.commit( WorkflowEvent.TaskStarted( this@StageWorkflowService, task.job.job, - task.task, - clock.millis() + task.task ) ) rootListener.taskStarted(task) @@ -323,14 +310,12 @@ public class StageWorkflowService( task.state = TaskStatus.FINISHED task.finishedAt = clock.millis() job.tasks.remove(task) - available += task.host!! activeTasks -= task - eventFlow.emit( + tracer.commit( WorkflowEvent.TaskFinished( this@StageWorkflowService, task.job.job, - task.task, - clock.millis() + task.task ) ) rootListener.taskFinished(task) @@ -355,9 +340,9 @@ public class StageWorkflowService( } } - private suspend fun finishJob(job: JobState) { + private fun finishJob(job: JobState) { activeJobs -= job - eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, clock.millis())) + tracer.commit(WorkflowEvent.JobFinished(this, job.job)) rootListener.jobFinished(job) } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt index ed023c82..d1eb6704 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/TaskState.kt @@ -22,7 +22,7 @@ package org.opendc.workflows.service -import org.opendc.compute.core.metal.Node +import org.opendc.compute.core.Server import org.opendc.workflows.workload.Task public class TaskState(public val job: JobState, public val task: Task) { @@ -62,7 +62,7 @@ public class TaskState(public val job: JobState, public val task: Task) { } } - public var host: Node? = null + public var server: Server? = null /** * Mark the specified [TaskView] as terminated. diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt index dadccb50..bcf93562 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt @@ -22,25 +22,33 @@ package org.opendc.workflows.service +import org.opendc.trace.core.Event import org.opendc.workflows.workload.Job import org.opendc.workflows.workload.Task /** * An event emitted by the [WorkflowService]. */ -public sealed class WorkflowEvent { +public sealed class WorkflowEvent : Event() { /** * The [WorkflowService] that emitted the event. */ public abstract val service: WorkflowService /** + * This event is emitted when a job was submitted to the scheduler. + */ + public data class JobSubmitted( + override val service: WorkflowService, + public val job: Job + ) : WorkflowEvent() + + /** * This event is emitted when a job has become active. */ public data class JobStarted( override val service: WorkflowService, - public val job: Job, - public val time: Long + public val job: Job ) : WorkflowEvent() /** @@ -48,8 +56,7 @@ public sealed class WorkflowEvent { */ public data class JobFinished( override val service: WorkflowService, - public val job: Job, - public val time: Long + public val job: Job ) : WorkflowEvent() /** @@ -58,8 +65,7 @@ public sealed class WorkflowEvent { public data class TaskStarted( override val service: WorkflowService, public val job: Job, - public val task: Task, - public val time: Long + public val task: Task ) : WorkflowEvent() /** @@ -68,7 +74,6 @@ public sealed class WorkflowEvent { public data class TaskFinished( override val service: WorkflowService, public val job: Job, - public val task: Task, - public val time: Long + public val task: Task ) : WorkflowEvent() } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt index 319a8b85..b24f80da 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt @@ -34,7 +34,7 @@ import java.util.* */ public interface WorkflowService { /** - * Thie events emitted by the workflow scheduler. + * The events emitted by the workflow scheduler. */ public val events: Flow<WorkflowEvent> diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt deleted file mode 100644 index 8dc323ec..00000000 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FirstFitResourceSelectionPolicy.kt +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflows.service.stage.resource - -import org.opendc.compute.core.metal.Node -import org.opendc.workflows.service.StageWorkflowService - -/** - * A [ResourceSelectionPolicy] that selects the first machine that is available. - */ -public object FirstFitResourceSelectionPolicy : ResourceSelectionPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<Node> = - Comparator<Node> { _, _ -> 1 } - - override fun toString(): String = "First-Fit" -} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt deleted file mode 100644 index ac79a9ce..00000000 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/FunctionalResourceFilterPolicy.kt +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflows.service.stage.resource - -import org.opendc.compute.core.metal.Node -import org.opendc.workflows.service.StageWorkflowService -import org.opendc.workflows.service.TaskState - -/** - * A [ResourceFilterPolicy] based on the amount of cores available on the machine and the cores required for - * the task. - */ -public object FunctionalResourceFilterPolicy : ResourceFilterPolicy { - override fun invoke(scheduler: StageWorkflowService): ResourceFilterPolicy.Logic = - object : ResourceFilterPolicy.Logic { - override fun invoke(hosts: Sequence<Node>, task: TaskState): Sequence<Node> = - hosts.filter { it in scheduler.available } - } - - override fun toString(): String = "functional" -} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt deleted file mode 100644 index caf87c70..00000000 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/RandomResourceSelectionPolicy.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflows.service.stage.resource - -import org.opendc.compute.core.metal.Node -import org.opendc.workflows.service.StageWorkflowService -import java.util.* - -/** - * A [ResourceSelectionPolicy] that randomly orders the machines. - */ -public object RandomResourceSelectionPolicy : ResourceSelectionPolicy { - override fun invoke(scheduler: StageWorkflowService): Comparator<Node> = object : Comparator<Node> { - private val ids: Map<Node, Long> - - init { - val random = Random(123) - ids = scheduler.nodes.associateWith { random.nextLong() } - } - - override fun compare(o1: Node, o2: Node): Int = compareValuesBy(o1, o2) { ids[it] } - } - - override fun toString(): String = "Random" -} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt deleted file mode 100644 index 4923a34b..00000000 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceFilterPolicy.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflows.service.stage.resource - -import org.opendc.compute.core.metal.Node -import org.opendc.workflows.service.TaskState -import org.opendc.workflows.service.stage.StagePolicy - -/** - * This interface represents stages **R2**, **R3** and **R4** stage of the Reference Architecture for Schedulers and - * acts as a filter yielding a list of resources with sufficient resource-capacities, based on fixed or dynamic - * requirements, and on predicted or monitored information about processing unit availability, memory occupancy, etc. - */ -public interface ResourceFilterPolicy : StagePolicy<ResourceFilterPolicy.Logic> { - public interface Logic { - /** - * Filter the list of machines based on dynamic information. - * - * @param hosts The hosts to filter. - * @param task The task that is to be scheduled. - * @return The machines on which the task can be scheduled. - */ - public operator fun invoke(hosts: Sequence<Node>, task: TaskState): Sequence<Node> - } -} diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt deleted file mode 100644 index 990b990a..00000000 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/stage/resource/ResourceSelectionPolicy.kt +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2020 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.workflows.service.stage.resource - -import org.opendc.compute.core.metal.Node -import org.opendc.workflows.service.stage.StagePolicy - -/** - * This interface represents the **R5** stage of the Reference Architecture for Schedulers and matches the the selected - * task with a (set of) resource(s), using policies such as First-Fit, Worst-Fit, and Best-Fit. - */ -public interface ResourceSelectionPolicy : StagePolicy<Comparator<Node>> diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt index d02e2b4e..4305aa57 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/workload/Metadata.kt @@ -28,3 +28,8 @@ package org.opendc.workflows.workload * Meta-data key for the deadline of a task. */ public const val WORKFLOW_TASK_DEADLINE: String = "workflow:task:deadline" + +/** + * Meta-data key for the number of cores needed for a task. + */ +public const val WORKFLOW_TASK_CORES: String = "workflow:task:cores" diff --git a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt index 90cf5b99..2bfcba35 100644 --- a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt @@ -35,14 +35,17 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertNotEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll import org.opendc.compute.core.metal.service.ProvisioningService +import org.opendc.compute.simulator.SimVirtProvisioningService +import org.opendc.compute.simulator.allocation.NumberOfActiveServersAllocationPolicy import org.opendc.format.environment.sc18.Sc18EnvironmentReader import org.opendc.format.trace.gwf.GwfTraceReader +import org.opendc.simulator.compute.SimSpaceSharedHypervisorProvider import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.trace.core.EventTracer import org.opendc.workflows.service.stage.job.NullJobAdmissionPolicy import org.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy -import org.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy -import org.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy import org.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import org.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy import kotlin.math.max @@ -57,7 +60,7 @@ internal class StageWorkflowSchedulerIntegrationTest { * A large integration test where we check whether all tasks in some trace are executed correctly. */ @Test - fun `should execute all tasks in trace`() { + fun testTrace() { var jobsSubmitted = 0L var jobsStarted = 0L var jobsFinished = 0L @@ -66,22 +69,32 @@ internal class StageWorkflowSchedulerIntegrationTest { val testScope = TestCoroutineScope() val clock = DelayControllerClockAdapter(testScope) + val tracer = EventTracer(clock) val schedulerAsync = testScope.async { val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) .use { it.construct(testScope, clock) } + val bareMetal = environment.platforms[0].zones[0].services[ProvisioningService] + + // Wait for the bare metal nodes to be spawned + delay(10) + + val provisioner = SimVirtProvisioningService(testScope, clock, bareMetal, NumberOfActiveServersAllocationPolicy(), tracer, SimSpaceSharedHypervisorProvider(), schedulingQuantum = 1000) + + // Wait for the hypervisors to be spawned + delay(10) + StageWorkflowService( testScope, clock, - environment.platforms[0].zones[0].services[ProvisioningService], + tracer, + provisioner, mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, jobOrderPolicy = SubmissionTimeJobOrderPolicy(), taskEligibilityPolicy = NullTaskEligibilityPolicy, taskOrderPolicy = SubmissionTimeTaskOrderPolicy(), - resourceFilterPolicy = FunctionalResourceFilterPolicy, - resourceSelectionPolicy = FirstFitResourceSelectionPolicy ) } @@ -106,16 +119,19 @@ internal class StageWorkflowSchedulerIntegrationTest { while (reader.hasNext()) { val (time, job) = reader.next() jobsSubmitted++ - delay(max(0, time * 1000 - clock.millis())) + delay(max(0, time - clock.millis())) scheduler.submit(job) } } testScope.advanceUntilIdle() - assertNotEquals(0, jobsSubmitted, "No jobs submitted") - assertEquals(jobsSubmitted, jobsStarted, "Not all submitted jobs started") - assertEquals(jobsSubmitted, jobsFinished, "Not all started jobs finished") - assertEquals(tasksStarted, tasksFinished, "Not all started tasks finished") + assertAll( + { assertEquals(emptyList<Throwable>(), testScope.uncaughtExceptions) }, + { assertNotEquals(0, jobsSubmitted, "No jobs submitted") }, + { assertEquals(jobsSubmitted, jobsStarted, "Not all submitted jobs started") }, + { assertEquals(jobsSubmitted, jobsFinished, "Not all started jobs finished") }, + { assertEquals(tasksStarted, tasksFinished, "Not all started tasks finished") } + ) } } |
