diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-03-21 22:04:31 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-03-25 10:51:27 +0100 |
| commit | 76bfeb44c5a02be143c152c52bc1029cff360744 (patch) | |
| tree | be467a0be698df2ebb4dd9fd3c5410d1e53ffa46 /opendc/opendc-workflows/src/test | |
| parent | bc64182612ad06f15bff5b48637ed7d241e293b2 (diff) | |
refactor: Migrate to Flow for event listeners
Diffstat (limited to 'opendc/opendc-workflows/src/test')
| -rw-r--r-- | opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt | 40 |
1 files changed, 18 insertions, 22 deletions
diff --git a/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt index 19e56482..5ee6d5e6 100644 --- a/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt @@ -29,17 +29,16 @@ import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader import com.atlarge.opendc.format.trace.gwf.GwfTraceReader -import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.service.stage.job.NullJobAdmissionPolicy import com.atlarge.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelectionPolicy import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy -import com.atlarge.opendc.workflows.workload.Job -import com.atlarge.opendc.workflows.workload.Task import kotlinx.coroutines.async import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Assertions.assertEquals @@ -64,24 +63,6 @@ internal class StageWorkflowSchedulerIntegrationTest { var tasksStarted = 0L var tasksFinished = 0L - val monitor = object : WorkflowMonitor { - override suspend fun onJobStart(job: Job, time: Long) { - jobsStarted++ - } - - override suspend fun onJobFinish(job: Job, time: Long) { - jobsFinished++ - } - - override suspend fun onTaskStart(job: Job, task: Task, time: Long) { - tasksStarted++ - } - - override suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) { - tasksFinished++ - } - } - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() val system = provider(name = "sim") @@ -104,6 +85,21 @@ internal class StageWorkflowSchedulerIntegrationTest { } val broker = system.newDomain(name = "broker") + + broker.launch { + val scheduler = schedulerAsync.await() + scheduler.events + .onEach { event -> + when (event) { + is WorkflowEvent.JobStarted -> jobsStarted++ + is WorkflowEvent.JobFinished -> jobsFinished++ + is WorkflowEvent.TaskStarted -> tasksStarted++ + is WorkflowEvent.TaskFinished -> tasksFinished++ + } + } + .collect() + } + broker.launch { val ctx = simulationContext val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf")) @@ -113,7 +109,7 @@ internal class StageWorkflowSchedulerIntegrationTest { val (time, job) = reader.next() jobsSubmitted++ delay(max(0, time * 1000 - ctx.clock.millis())) - scheduler.submit(job, monitor) + scheduler.submit(job) } } |
