diff options
Diffstat (limited to 'opendc/opendc-experiments-sc18/src/main')
| -rw-r--r-- | opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt | 34 |
1 files changed, 20 insertions, 14 deletions
diff --git a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt index 96796c07..d5e1404a 100644 --- a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt +++ b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt @@ -25,6 +25,7 @@ package com.atlarge.opendc.experiments.sc18 import com.atlarge.odcsim.SimulationEngineProvider +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader import com.atlarge.opendc.format.trace.gwf.GwfTraceReader @@ -39,12 +40,14 @@ import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy import com.atlarge.opendc.workflows.workload.Job import com.atlarge.opendc.workflows.workload.Task +import kotlin.math.max +import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.io.File import java.util.ServiceLoader -import kotlin.math.max /** * Main entry point of the experiment. @@ -55,9 +58,6 @@ fun main(args: Array<String>) { return } - val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json")) - .use { it.read() } - var total = 0 var finished = 0 @@ -85,11 +85,16 @@ fun main(args: Array<String>) { } val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider({ ctx -> - println(ctx.clock.instant()) - val scheduler = StageWorkflowService( - ctx, - environment.platforms[0].zones[0].services[ProvisioningService.Key], + val system = provider(name = "sim") + + val schedulerDomain = system.newDomain(name = "scheduler") + val schedulerAsync = schedulerDomain.async { + val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json")) + .use { it.construct(system.newDomain("topology")) } + + StageWorkflowService( + schedulerDomain, + environment.platforms[0].zones[0].services[ProvisioningService], mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, jobOrderPolicy = SubmissionTimeJobOrderPolicy(), @@ -98,8 +103,13 @@ fun main(args: Array<String>) { resourceFilterPolicy = FunctionalResourceFilterPolicy, resourceSelectionPolicy = FirstFitResourceSelectionPolicy ) + } + val broker = system.newDomain(name = "broker") + broker.launch { + val ctx = simulationContext val reader = GwfTraceReader(File(args[0])) + val scheduler = schedulerAsync.await() while (reader.hasNext()) { val (time, job) = reader.next() @@ -107,11 +117,7 @@ fun main(args: Array<String>) { delay(max(0, time * 1000 - ctx.clock.millis())) scheduler.submit(job, monitor) } - - token.receive() - - println(ctx.clock.instant()) - }, name = "sim") + } runBlocking { system.run() |
