diff options
Diffstat (limited to 'simulator/opendc/opendc-experiments-sc18/src/main')
| -rw-r--r-- | simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt | 39 |
1 files changed, 15 insertions, 24 deletions
diff --git a/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt index c7577824..0cece647 100644 --- a/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt +++ b/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt @@ -24,8 +24,6 @@ package com.atlarge.opendc.experiments.sc18 -import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader import com.atlarge.opendc.format.trace.gwf.GwfTraceReader @@ -38,20 +36,19 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy -import kotlinx.coroutines.async +import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.TestCoroutineScope +import org.opendc.simulator.utils.DelayControllerClockAdapter import java.io.File -import java.util.ServiceLoader import kotlin.math.max /** * Main entry point of the experiment. */ +@OptIn(ExperimentalCoroutinesApi::class) fun main(args: Array<String>) { if (args.isEmpty()) { println("error: Please provide path to GWF trace") @@ -62,17 +59,16 @@ fun main(args: Array<String>) { var finished = 0 val token = Channel<Boolean>() - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider(name = "sim") + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) - val schedulerDomain = system.newDomain(name = "scheduler") - val schedulerAsync = schedulerDomain.async { + val schedulerAsync = testScope.async { val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json")) - .use { it.construct(schedulerDomain) } + .use { it.construct(this, clock) } StageWorkflowService( - schedulerDomain, - simulationContext.clock, + this, + clock, environment.platforms[0].zones[0].services[ProvisioningService], mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, @@ -84,9 +80,7 @@ fun main(args: Array<String>) { ) } - val broker = system.newDomain(name = "broker") - - broker.launch { + testScope.launch { val scheduler = schedulerAsync.await() scheduler.events .onEach { event -> @@ -106,21 +100,18 @@ fun main(args: Array<String>) { } .collect() } - broker.launch { - val ctx = simulationContext + + testScope.launch { val reader = GwfTraceReader(File(args[0])) val scheduler = schedulerAsync.await() while (reader.hasNext()) { val (time, job) = reader.next() total += 1 - delay(max(0, time * 1000 - ctx.clock.millis())) + delay(max(0, time * 1000 - clock.millis())) scheduler.submit(job) } } - runBlocking { - system.run() - system.terminate() - } + testScope.advanceUntilIdle() } |
