From c3a0a1711360c866a38b273dcf681f3aab9ae0ae Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 3 Jan 2021 15:43:55 +0100 Subject: Adapt workflow engine to use event tracer --- .../org/opendc/experiments/sc18/TestExperiment.kt | 3 +++ simulator/opendc-workflows/build.gradle.kts | 1 + .../workflows/service/StageWorkflowService.kt | 31 ++++++++++++++-------- .../org/opendc/workflows/service/WorkflowEvent.kt | 23 +++++++++------- .../opendc/workflows/service/WorkflowService.kt | 2 +- .../StageWorkflowSchedulerIntegrationTest.kt | 14 +++++++--- 6 files changed, 49 insertions(+), 25 deletions(-) (limited to 'simulator') diff --git a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt index 3786eebf..1221c7d3 100644 --- a/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt +++ b/simulator/opendc-experiments/opendc-experiments-sc18/src/main/kotlin/org/opendc/experiments/sc18/TestExperiment.kt @@ -31,6 +31,7 @@ import org.opendc.compute.core.metal.service.ProvisioningService import org.opendc.format.environment.sc18.Sc18EnvironmentReader import org.opendc.format.trace.gwf.GwfTraceReader import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.trace.core.EventTracer import org.opendc.workflows.service.StageWorkflowService import org.opendc.workflows.service.WorkflowEvent import org.opendc.workflows.service.WorkflowSchedulerMode @@ -59,6 +60,7 @@ public fun main(args: Array) { val token = Channel() val testScope = TestCoroutineScope() val clock = DelayControllerClockAdapter(testScope) + val tracer = EventTracer(clock) val schedulerAsync = testScope.async { val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json")) @@ -67,6 +69,7 @@ public fun main(args: Array) { StageWorkflowService( this, clock, + tracer, environment.platforms[0].zones[0].services[ProvisioningService], mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, diff --git a/simulator/opendc-workflows/build.gradle.kts b/simulator/opendc-workflows/build.gradle.kts index f61bdac6..fa03508c 100644 --- a/simulator/opendc-workflows/build.gradle.kts +++ b/simulator/opendc-workflows/build.gradle.kts @@ -30,6 +30,7 @@ plugins { dependencies { api(project(":opendc-core")) api(project(":opendc-compute:opendc-compute-core")) + api(project(":opendc-trace:opendc-trace-core")) implementation(project(":opendc-utils")) testImplementation(project(":opendc-simulator:opendc-simulator-core")) diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt index 3b4e6eab..91657f83 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt @@ -26,6 +26,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import org.opendc.compute.core.Server @@ -33,7 +34,9 @@ import org.opendc.compute.core.ServerEvent import org.opendc.compute.core.ServerState import org.opendc.compute.core.metal.Node import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.utils.flow.EventFlow +import org.opendc.trace.core.EventTracer +import org.opendc.trace.core.consumeAsFlow +import org.opendc.trace.core.enable import org.opendc.workflows.service.stage.job.JobAdmissionPolicy import org.opendc.workflows.service.stage.job.JobOrderPolicy import org.opendc.workflows.service.stage.resource.ResourceFilterPolicy @@ -51,6 +54,7 @@ import java.util.* public class StageWorkflowService( internal val coroutineScope: CoroutineScope, internal val clock: Clock, + internal val tracer: EventTracer, private val provisioningService: ProvisioningService, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -168,7 +172,6 @@ public class StageWorkflowService( private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic private val resourceFilterPolicy: ResourceFilterPolicy.Logic private val resourceSelectionPolicy: Comparator - private val eventFlow = EventFlow() init { coroutineScope.launch { @@ -185,7 +188,14 @@ public class StageWorkflowService( this.resourceSelectionPolicy = resourceSelectionPolicy(this) } - override val events: Flow = eventFlow + override val events: Flow = tracer.openRecording().let { + it.enable() + it.enable() + it.enable() + it.enable() + it.enable() + it.consumeAsFlow().map { event -> event as WorkflowEvent } + } override suspend fun submit(job: Job) { // J1 Incoming Jobs @@ -209,6 +219,7 @@ public class StageWorkflowService( instances.values.toCollection(jobInstance.tasks) incomingJobs += jobInstance rootListener.jobSubmitted(jobInstance) + tracer.commit(WorkflowEvent.JobSubmitted(this, jobInstance.job)) requestCycle() } @@ -237,7 +248,7 @@ public class StageWorkflowService( iterator.remove() jobQueue.add(jobInstance) activeJobs += jobInstance - eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, clock.millis())) + tracer.commit(WorkflowEvent.JobStarted(this, jobInstance.job)) rootListener.jobStarted(jobInstance) } @@ -307,12 +318,11 @@ public class StageWorkflowService( ServerState.ACTIVE -> { val task = taskByServer.getValue(server) task.startedAt = clock.millis() - eventFlow.emit( + tracer.commit( WorkflowEvent.TaskStarted( this@StageWorkflowService, task.job.job, - task.task, - clock.millis() + task.task ) ) rootListener.taskStarted(task) @@ -325,12 +335,11 @@ public class StageWorkflowService( job.tasks.remove(task) available += task.host!! activeTasks -= task - eventFlow.emit( + tracer.commit( WorkflowEvent.TaskFinished( this@StageWorkflowService, task.job.job, - task.task, - clock.millis() + task.task ) ) rootListener.taskFinished(task) @@ -357,7 +366,7 @@ public class StageWorkflowService( private suspend fun finishJob(job: JobState) { activeJobs -= job - eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, clock.millis())) + tracer.commit(WorkflowEvent.JobFinished(this, job.job)) rootListener.jobFinished(job) } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt index dadccb50..bcf93562 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt @@ -22,25 +22,33 @@ package org.opendc.workflows.service +import org.opendc.trace.core.Event import org.opendc.workflows.workload.Job import org.opendc.workflows.workload.Task /** * An event emitted by the [WorkflowService]. */ -public sealed class WorkflowEvent { +public sealed class WorkflowEvent : Event() { /** * The [WorkflowService] that emitted the event. */ public abstract val service: WorkflowService + /** + * This event is emitted when a job was submitted to the scheduler. + */ + public data class JobSubmitted( + override val service: WorkflowService, + public val job: Job + ) : WorkflowEvent() + /** * This event is emitted when a job has become active. */ public data class JobStarted( override val service: WorkflowService, - public val job: Job, - public val time: Long + public val job: Job ) : WorkflowEvent() /** @@ -48,8 +56,7 @@ public sealed class WorkflowEvent { */ public data class JobFinished( override val service: WorkflowService, - public val job: Job, - public val time: Long + public val job: Job ) : WorkflowEvent() /** @@ -58,8 +65,7 @@ public sealed class WorkflowEvent { public data class TaskStarted( override val service: WorkflowService, public val job: Job, - public val task: Task, - public val time: Long + public val task: Task ) : WorkflowEvent() /** @@ -68,7 +74,6 @@ public sealed class WorkflowEvent { public data class TaskFinished( override val service: WorkflowService, public val job: Job, - public val task: Task, - public val time: Long + public val task: Task ) : WorkflowEvent() } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt index 319a8b85..b24f80da 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt @@ -34,7 +34,7 @@ import java.util.* */ public interface WorkflowService { /** - * Thie events emitted by the workflow scheduler. + * The events emitted by the workflow scheduler. */ public val events: Flow diff --git a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt index 90cf5b99..62955a11 100644 --- a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt @@ -35,10 +35,12 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertNotEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll import org.opendc.compute.core.metal.service.ProvisioningService import org.opendc.format.environment.sc18.Sc18EnvironmentReader import org.opendc.format.trace.gwf.GwfTraceReader import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.trace.core.EventTracer import org.opendc.workflows.service.stage.job.NullJobAdmissionPolicy import org.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy import org.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy @@ -66,6 +68,7 @@ internal class StageWorkflowSchedulerIntegrationTest { val testScope = TestCoroutineScope() val clock = DelayControllerClockAdapter(testScope) + val tracer = EventTracer(clock) val schedulerAsync = testScope.async { val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) @@ -74,6 +77,7 @@ internal class StageWorkflowSchedulerIntegrationTest { StageWorkflowService( testScope, clock, + tracer, environment.platforms[0].zones[0].services[ProvisioningService], mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, @@ -113,9 +117,11 @@ internal class StageWorkflowSchedulerIntegrationTest { testScope.advanceUntilIdle() - assertNotEquals(0, jobsSubmitted, "No jobs submitted") - assertEquals(jobsSubmitted, jobsStarted, "Not all submitted jobs started") - assertEquals(jobsSubmitted, jobsFinished, "Not all started jobs finished") - assertEquals(tasksStarted, tasksFinished, "Not all started tasks finished") + assertAll( + { assertNotEquals(0, jobsSubmitted, "No jobs submitted") }, + { assertEquals(jobsSubmitted, jobsStarted, "Not all submitted jobs started") }, + { assertEquals(jobsSubmitted, jobsFinished, "Not all started jobs finished") }, + { assertEquals(tasksStarted, tasksFinished, "Not all started tasks finished") } + ) } } -- cgit v1.2.3