summaryrefslogtreecommitdiff
path: root/simulator/opendc/opendc-experiments-sc18/src
diff options
context:
space:
mode:
Diffstat (limited to 'simulator/opendc/opendc-experiments-sc18/src')
-rw-r--r--simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt39
1 files changed, 15 insertions, 24 deletions
diff --git a/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
index c7577824..0cece647 100644
--- a/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
+++ b/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
@@ -24,8 +24,6 @@
package com.atlarge.opendc.experiments.sc18
-import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader
import com.atlarge.opendc.format.trace.gwf.GwfTraceReader
@@ -38,20 +36,19 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec
import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
-import kotlinx.coroutines.async
+import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.test.TestCoroutineScope
+import org.opendc.simulator.utils.DelayControllerClockAdapter
import java.io.File
-import java.util.ServiceLoader
import kotlin.math.max
/**
* Main entry point of the experiment.
*/
+@OptIn(ExperimentalCoroutinesApi::class)
fun main(args: Array<String>) {
if (args.isEmpty()) {
println("error: Please provide path to GWF trace")
@@ -62,17 +59,16 @@ fun main(args: Array<String>) {
var finished = 0
val token = Channel<Boolean>()
- val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider(name = "sim")
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
- val schedulerDomain = system.newDomain(name = "scheduler")
- val schedulerAsync = schedulerDomain.async {
+ val schedulerAsync = testScope.async {
val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json"))
- .use { it.construct(schedulerDomain) }
+ .use { it.construct(this, clock) }
StageWorkflowService(
- schedulerDomain,
- simulationContext.clock,
+ this,
+ clock,
environment.platforms[0].zones[0].services[ProvisioningService],
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,
@@ -84,9 +80,7 @@ fun main(args: Array<String>) {
)
}
- val broker = system.newDomain(name = "broker")
-
- broker.launch {
+ testScope.launch {
val scheduler = schedulerAsync.await()
scheduler.events
.onEach { event ->
@@ -106,21 +100,18 @@ fun main(args: Array<String>) {
}
.collect()
}
- broker.launch {
- val ctx = simulationContext
+
+ testScope.launch {
val reader = GwfTraceReader(File(args[0]))
val scheduler = schedulerAsync.await()
while (reader.hasNext()) {
val (time, job) = reader.next()
total += 1
- delay(max(0, time * 1000 - ctx.clock.millis()))
+ delay(max(0, time * 1000 - clock.millis()))
scheduler.submit(job)
}
}
- runBlocking {
- system.run()
- system.terminate()
- }
+ testScope.advanceUntilIdle()
}