summaryrefslogtreecommitdiff
path: root/simulator/opendc-workflows/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-01-03 15:43:55 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-01-03 16:08:28 +0100
commitc3a0a1711360c866a38b273dcf681f3aab9ae0ae (patch)
treee5ce165378b1d70fa4a9b348acbde707e1e1aaea /simulator/opendc-workflows/src
parent48a768b873e390825178073232ce18b429cdf4c3 (diff)
Adapt workflow engine to use event tracer
Diffstat (limited to 'simulator/opendc-workflows/src')
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt31
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt23
-rw-r--r--simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt2
-rw-r--r--simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt14
4 files changed, 45 insertions, 25 deletions
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
index 3b4e6eab..91657f83 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/StageWorkflowService.kt
@@ -26,6 +26,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import org.opendc.compute.core.Server
@@ -33,7 +34,9 @@ import org.opendc.compute.core.ServerEvent
import org.opendc.compute.core.ServerState
import org.opendc.compute.core.metal.Node
import org.opendc.compute.core.metal.service.ProvisioningService
-import org.opendc.utils.flow.EventFlow
+import org.opendc.trace.core.EventTracer
+import org.opendc.trace.core.consumeAsFlow
+import org.opendc.trace.core.enable
import org.opendc.workflows.service.stage.job.JobAdmissionPolicy
import org.opendc.workflows.service.stage.job.JobOrderPolicy
import org.opendc.workflows.service.stage.resource.ResourceFilterPolicy
@@ -51,6 +54,7 @@ import java.util.*
public class StageWorkflowService(
internal val coroutineScope: CoroutineScope,
internal val clock: Clock,
+ internal val tracer: EventTracer,
private val provisioningService: ProvisioningService,
mode: WorkflowSchedulerMode,
jobAdmissionPolicy: JobAdmissionPolicy,
@@ -168,7 +172,6 @@ public class StageWorkflowService(
private val taskEligibilityPolicy: TaskEligibilityPolicy.Logic
private val resourceFilterPolicy: ResourceFilterPolicy.Logic
private val resourceSelectionPolicy: Comparator<Node>
- private val eventFlow = EventFlow<WorkflowEvent>()
init {
coroutineScope.launch {
@@ -185,7 +188,14 @@ public class StageWorkflowService(
this.resourceSelectionPolicy = resourceSelectionPolicy(this)
}
- override val events: Flow<WorkflowEvent> = eventFlow
+ override val events: Flow<WorkflowEvent> = tracer.openRecording().let {
+ it.enable<WorkflowEvent.JobSubmitted>()
+ it.enable<WorkflowEvent.JobStarted>()
+ it.enable<WorkflowEvent.JobFinished>()
+ it.enable<WorkflowEvent.TaskStarted>()
+ it.enable<WorkflowEvent.TaskFinished>()
+ it.consumeAsFlow().map { event -> event as WorkflowEvent }
+ }
override suspend fun submit(job: Job) {
// J1 Incoming Jobs
@@ -209,6 +219,7 @@ public class StageWorkflowService(
instances.values.toCollection(jobInstance.tasks)
incomingJobs += jobInstance
rootListener.jobSubmitted(jobInstance)
+ tracer.commit(WorkflowEvent.JobSubmitted(this, jobInstance.job))
requestCycle()
}
@@ -237,7 +248,7 @@ public class StageWorkflowService(
iterator.remove()
jobQueue.add(jobInstance)
activeJobs += jobInstance
- eventFlow.emit(WorkflowEvent.JobStarted(this, jobInstance.job, clock.millis()))
+ tracer.commit(WorkflowEvent.JobStarted(this, jobInstance.job))
rootListener.jobStarted(jobInstance)
}
@@ -307,12 +318,11 @@ public class StageWorkflowService(
ServerState.ACTIVE -> {
val task = taskByServer.getValue(server)
task.startedAt = clock.millis()
- eventFlow.emit(
+ tracer.commit(
WorkflowEvent.TaskStarted(
this@StageWorkflowService,
task.job.job,
- task.task,
- clock.millis()
+ task.task
)
)
rootListener.taskStarted(task)
@@ -325,12 +335,11 @@ public class StageWorkflowService(
job.tasks.remove(task)
available += task.host!!
activeTasks -= task
- eventFlow.emit(
+ tracer.commit(
WorkflowEvent.TaskFinished(
this@StageWorkflowService,
task.job.job,
- task.task,
- clock.millis()
+ task.task
)
)
rootListener.taskFinished(task)
@@ -357,7 +366,7 @@ public class StageWorkflowService(
private suspend fun finishJob(job: JobState) {
activeJobs -= job
- eventFlow.emit(WorkflowEvent.JobFinished(this, job.job, clock.millis()))
+ tracer.commit(WorkflowEvent.JobFinished(this, job.job))
rootListener.jobFinished(job)
}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt
index dadccb50..bcf93562 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowEvent.kt
@@ -22,25 +22,33 @@
package org.opendc.workflows.service
+import org.opendc.trace.core.Event
import org.opendc.workflows.workload.Job
import org.opendc.workflows.workload.Task
/**
* An event emitted by the [WorkflowService].
*/
-public sealed class WorkflowEvent {
+public sealed class WorkflowEvent : Event() {
/**
* The [WorkflowService] that emitted the event.
*/
public abstract val service: WorkflowService
/**
+ * This event is emitted when a job was submitted to the scheduler.
+ */
+ public data class JobSubmitted(
+ override val service: WorkflowService,
+ public val job: Job
+ ) : WorkflowEvent()
+
+ /**
* This event is emitted when a job has become active.
*/
public data class JobStarted(
override val service: WorkflowService,
- public val job: Job,
- public val time: Long
+ public val job: Job
) : WorkflowEvent()
/**
@@ -48,8 +56,7 @@ public sealed class WorkflowEvent {
*/
public data class JobFinished(
override val service: WorkflowService,
- public val job: Job,
- public val time: Long
+ public val job: Job
) : WorkflowEvent()
/**
@@ -58,8 +65,7 @@ public sealed class WorkflowEvent {
public data class TaskStarted(
override val service: WorkflowService,
public val job: Job,
- public val task: Task,
- public val time: Long
+ public val task: Task
) : WorkflowEvent()
/**
@@ -68,7 +74,6 @@ public sealed class WorkflowEvent {
public data class TaskFinished(
override val service: WorkflowService,
public val job: Job,
- public val task: Task,
- public val time: Long
+ public val task: Task
) : WorkflowEvent()
}
diff --git a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt
index 319a8b85..b24f80da 100644
--- a/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt
+++ b/simulator/opendc-workflows/src/main/kotlin/org/opendc/workflows/service/WorkflowService.kt
@@ -34,7 +34,7 @@ import java.util.*
*/
public interface WorkflowService {
/**
- * Thie events emitted by the workflow scheduler.
+ * The events emitted by the workflow scheduler.
*/
public val events: Flow<WorkflowEvent>
diff --git a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
index 90cf5b99..62955a11 100644
--- a/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
+++ b/simulator/opendc-workflows/src/test/kotlin/org/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
@@ -35,10 +35,12 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNotEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
import org.opendc.compute.core.metal.service.ProvisioningService
import org.opendc.format.environment.sc18.Sc18EnvironmentReader
import org.opendc.format.trace.gwf.GwfTraceReader
import org.opendc.simulator.utils.DelayControllerClockAdapter
+import org.opendc.trace.core.EventTracer
import org.opendc.workflows.service.stage.job.NullJobAdmissionPolicy
import org.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy
import org.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy
@@ -66,6 +68,7 @@ internal class StageWorkflowSchedulerIntegrationTest {
val testScope = TestCoroutineScope()
val clock = DelayControllerClockAdapter(testScope)
+ val tracer = EventTracer(clock)
val schedulerAsync = testScope.async {
val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json"))
@@ -74,6 +77,7 @@ internal class StageWorkflowSchedulerIntegrationTest {
StageWorkflowService(
testScope,
clock,
+ tracer,
environment.platforms[0].zones[0].services[ProvisioningService],
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,
@@ -113,9 +117,11 @@ internal class StageWorkflowSchedulerIntegrationTest {
testScope.advanceUntilIdle()
- assertNotEquals(0, jobsSubmitted, "No jobs submitted")
- assertEquals(jobsSubmitted, jobsStarted, "Not all submitted jobs started")
- assertEquals(jobsSubmitted, jobsFinished, "Not all started jobs finished")
- assertEquals(tasksStarted, tasksFinished, "Not all started tasks finished")
+ assertAll(
+ { assertNotEquals(0, jobsSubmitted, "No jobs submitted") },
+ { assertEquals(jobsSubmitted, jobsStarted, "Not all submitted jobs started") },
+ { assertEquals(jobsSubmitted, jobsFinished, "Not all started jobs finished") },
+ { assertEquals(tasksStarted, tasksFinished, "Not all started tasks finished") }
+ )
}
}