diff options
Diffstat (limited to 'opendc/opendc-experiments-sc18')
| -rw-r--r-- | opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt | 51 |
1 files changed, 25 insertions, 26 deletions
diff --git a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt index d5e1404a..b0182ab3 100644 --- a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt +++ b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt @@ -29,8 +29,8 @@ import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader import com.atlarge.opendc.format.trace.gwf.GwfTraceReader -import com.atlarge.opendc.workflows.monitor.WorkflowMonitor import com.atlarge.opendc.workflows.service.StageWorkflowService +import com.atlarge.opendc.workflows.service.WorkflowEvent import com.atlarge.opendc.workflows.service.WorkflowSchedulerMode import com.atlarge.opendc.workflows.service.stage.job.NullJobAdmissionPolicy import com.atlarge.opendc.workflows.service.stage.job.SubmissionTimeJobOrderPolicy @@ -38,12 +38,12 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy -import com.atlarge.opendc.workflows.workload.Job -import com.atlarge.opendc.workflows.workload.Task import kotlin.math.max import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.io.File @@ -62,28 +62,6 @@ fun main(args: Array<String>) { var finished = 0 val token = Channel<Boolean>() - - val monitor = object : WorkflowMonitor { - override suspend fun onJobStart(job: Job, time: Long) { - println("Job ${job.uid} started") - } - - override suspend fun onJobFinish(job: Job, time: Long) { - finished += 1 - println("Jobs $finished/$total finished (${job.tasks.size} tasks)") - - if (finished == total) { - token.send(true) - } - } - - override suspend fun onTaskStart(job: Job, task: Task, time: Long) { - } - - override suspend fun onTaskFinish(job: Job, task: Task, status: Int, time: Long) { - } - } - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() val system = provider(name = "sim") @@ -106,6 +84,27 @@ fun main(args: Array<String>) { } val broker = system.newDomain(name = "broker") + + broker.launch { + val scheduler = schedulerAsync.await() + scheduler.events + .onEach { event -> + when (event) { + is WorkflowEvent.JobStarted -> { + println("Job ${event.job.uid} started") + } + is WorkflowEvent.JobFinished -> { + finished += 1 + println("Jobs $finished/$total finished (${event.job.tasks.size} tasks)") + + if (finished == total) { + token.send(true) + } + } + } + } + .collect() + } broker.launch { val ctx = simulationContext val reader = GwfTraceReader(File(args[0])) @@ -115,7 +114,7 @@ fun main(args: Array<String>) { val (time, job) = reader.next() total += 1 delay(max(0, time * 1000 - ctx.clock.millis())) - scheduler.submit(job, monitor) + scheduler.submit(job) } } |
