diff options
Diffstat (limited to 'simulator/opendc-workflows/src/main/kotlin')
3 files changed, 35 insertions, 21 deletions
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt index 3b4e6eab..91657f83 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt @@ -26,6 +26,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import org.opendc.compute.core.Server @@ -33,7 +34,9 @@ import org.opendc.compute.core.ServerEvent import org.opendc.compute.core.ServerState import org.opendc.compute.core.metal.Node import org.opendc.compute.core.metal.service.ProvisioningService -import org.opendc.utils.flow.EventFlow +import org.opendc.trace.core.EventTracer +import org.opendc.trace.core.consumeAsFlow +import org.opendc.trace.core.enable import org.opendc.workflows.service.stage.job.JobAdmissionPolicy import org.opendc.workflows.service.stage.job.JobOrderPolicy import org.opendc.workflows.service.stage.resource.ResourceFilterPolicy @@ -51,6 +54,7 @@ import java.util.* public class StageWorkflowService( internal val coroutineScope: CoroutineScope, internal val clock: Clock, + internal val tracer: EventTracer, private val provisioningService: ProvisioningService, mode: WorkflowSchedulerMode, jobAdmissionPolicy: JobAdmissionPolicy, @@ -168,7 +172,6 @@ public class StageWorkflowService( private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic private val resourceFilterPolicy: ResourceFilterPolicy.Logic private val resourceSelectionPolicy: Comparator<Node> - private val eventFlow = EventFlow<WorkflowEvent>() init { coroutineScope.launch { @@ -185,7 +188,14 @@ public class StageWorkflowService( this.resourceSelectionPolicy = resourceSelectionPolicy(this) } - override val events: Flow<WorkflowEvent> = eventFlow + override val events: Flow<WorkflowEvent> = tracer.openRecording().let { + it.enable<WorkflowEvent.JobSubmitted>() + it.enable<WorkflowEvent.JobStarted>() + it.enable<WorkflowEvent.JobFinished>() + it.enable<WorkflowEvent.TaskStarted>() + it.enable<WorkflowEvent.TaskFinished>() + it.consumeAsFlow().map { event -> event as WorkflowEvent } + } override suspend fun submit(job: Job) { // J1 Incoming Jobs @@ -209,6 +219,7 @@ public class StageWorkflowService( instances.values.toCollection(jobInstance.tasks) incomingJobs += jobInstance rootListener.jobSubmitted(jobInstance) + tracer.commit(WorkflowEvent.JobSubmitted(this, jobInstance.job)) requestCycle() } @@ -237,7 +248,7 @@ public class StageWorkflowService( iterator.remove() jobQueue.add(jobInstance) activeJobs += jobInstance - eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, clock.millis())) + tracer.commit(WorkflowEvent.JobStarted(this, jobInstance.job)) rootListener.jobStarted(jobInstance) } @@ -307,12 +318,11 @@ public class StageWorkflowService( ServerState.ACTIVE -> { val task = taskByServer.getValue(server) task.startedAt = clock.millis() - eventFlow.emit( + tracer.commit( WorkflowEvent.TaskStarted( this@StageWorkflowService, task.job.job, - task.task, - clock.millis() + task.task ) ) rootListener.taskStarted(task) @@ -325,12 +335,11 @@ public class StageWorkflowService( job.tasks.remove(task) available += task.host!! activeTasks -= task - eventFlow.emit( + tracer.commit( WorkflowEvent.TaskFinished( this@StageWorkflowService, task.job.job, - task.task, - clock.millis() + task.task ) ) rootListener.taskFinished(task) @@ -357,7 +366,7 @@ public class StageWorkflowService( private suspend fun finishJob(job: JobState) { activeJobs -= job - eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, clock.millis())) + tracer.commit(WorkflowEvent.JobFinished(this, job.job)) rootListener.jobFinished(job) } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt index dadccb50..bcf93562 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt @@ -22,25 +22,33 @@ package org.opendc.workflows.service +import org.opendc.trace.core.Event import org.opendc.workflows.workload.Job import org.opendc.workflows.workload.Task /** * An event emitted by the [WorkflowService]. */ -public sealed class WorkflowEvent { +public sealed class WorkflowEvent : Event() { /** * The [WorkflowService] that emitted the event. */ public abstract val service: WorkflowService /** + * This event is emitted when a job was submitted to the scheduler. + */ + public data class JobSubmitted( + override val service: WorkflowService, + public val job: Job + ) : WorkflowEvent() + + /** * This event is emitted when a job has become active. */ public data class JobStarted( override val service: WorkflowService, - public val job: Job, - public val time: Long + public val job: Job ) : WorkflowEvent() /** @@ -48,8 +56,7 @@ public sealed class WorkflowEvent { */ public data class JobFinished( override val service: WorkflowService, - public val job: Job, - public val time: Long + public val job: Job ) : WorkflowEvent() /** @@ -58,8 +65,7 @@ public sealed class WorkflowEvent { public data class TaskStarted( override val service: WorkflowService, public val job: Job, - public val task: Task, - public val time: Long + public val task: Task ) : WorkflowEvent() /** @@ -68,7 +74,6 @@ public sealed class WorkflowEvent { public data class TaskFinished( override val service: WorkflowService, public val job: Job, - public val task: Task, - public val time: Long + public val task: Task ) : WorkflowEvent() } diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt index 319a8b85..b24f80da 100644 --- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt +++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt @@ -34,7 +34,7 @@ import java.util.* */ public interface WorkflowService { /** - * Thie events emitted by the workflow scheduler. + * The events emitted by the workflow scheduler. */ public val events: Flow<WorkflowEvent> |
