summaryrefslogtreecommitdiff
path: root/opendc/opendc-experiments-sc18/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc/opendc-experiments-sc18/src')
-rw-r--r--opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt34
1 files changed, 20 insertions, 14 deletions
diff --git a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
index 96796c07..d5e1404a 100644
--- a/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
@@ -25,6 +25,7 @@
package com.atlarge.opendc.experiments.sc18
import com.atlarge.odcsim.SimulationEngineProvider
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader
import com.atlarge.opendc.format.trace.gwf.GwfTraceReader
@@ -39,12 +40,14 @@ import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
import com.atlarge.opendc.workflows.workload.Job
import com.atlarge.opendc.workflows.workload.Task
+import kotlin.math.max
+import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.io.File
import java.util.ServiceLoader
-import kotlin.math.max
/**
* Main entry point of the experiment.
@@ -55,9 +58,6 @@ fun main(args: Array<String>) {
return
}
- val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json"))
- .use { it.read() }
-
var total = 0
var finished = 0
@@ -85,11 +85,16 @@ fun main(args: Array<String>) {
}
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider({ ctx ->
- println(ctx.clock.instant())
- val scheduler = StageWorkflowService(
- ctx,
- environment.platforms[0].zones[0].services[ProvisioningService.Key],
+ val system = provider(name = "sim")
+
+ val schedulerDomain = system.newDomain(name = "scheduler")
+ val schedulerAsync = schedulerDomain.async {
+ val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json"))
+ .use { it.construct(system.newDomain("topology")) }
+
+ StageWorkflowService(
+ schedulerDomain,
+ environment.platforms[0].zones[0].services[ProvisioningService],
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,
jobOrderPolicy = SubmissionTimeJobOrderPolicy(),
@@ -98,8 +103,13 @@ fun main(args: Array<String>) {
resourceFilterPolicy = FunctionalResourceFilterPolicy,
resourceSelectionPolicy = FirstFitResourceSelectionPolicy
)
+ }
+ val broker = system.newDomain(name = "broker")
+ broker.launch {
+ val ctx = simulationContext
val reader = GwfTraceReader(File(args[0]))
+ val scheduler = schedulerAsync.await()
while (reader.hasNext()) {
val (time, job) = reader.next()
@@ -107,11 +117,7 @@ fun main(args: Array<String>) {
delay(max(0, time * 1000 - ctx.clock.millis()))
scheduler.submit(job, monitor)
}
-
- token.receive()
-
- println(ctx.clock.instant())
- }, name = "sim")
+ }
runBlocking {
system.run()