summaryrefslogtreecommitdiff
path: root/simulator/opendc
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-09-30 23:56:07 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-09-30 23:56:07 +0200
commitfcae560208df4860bc7461f955bf3b522b0e61c5 (patch)
tree933f47f1061274a6a7e648da82c13f08fce41ea5 /simulator/opendc
parent1766888d6dde44f96508a4bc6878978ddcaa073d (diff)
Migrate from Domain to TestCoroutineScope
This change eliminates the use of Domain and simulationContext in favour of the generic (Test)CoroutineScope and Clock classes. In this way, we decouple the OpenDC modules and their logic from simulation-related code. In this way, we also simplify eventual attempt for emulating OpenDC componments in real-time.
Diffstat (limited to 'simulator/opendc')
-rw-r--r--simulator/opendc/opendc-compute/build.gradle.kts2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt12
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt31
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt25
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt41
-rw-r--r--simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt11
-rw-r--r--simulator/opendc/opendc-experiments-sc18/build.gradle.kts2
-rw-r--r--simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt39
-rw-r--r--simulator/opendc/opendc-experiments-sc20/build.gradle.kts1
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt61
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt29
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt52
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt3
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt6
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt6
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt5
-rw-r--r--simulator/opendc/opendc-runner-web/build.gradle.kts1
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt27
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt5
-rw-r--r--simulator/opendc/opendc-workflows/build.gradle.kts2
-rw-r--r--simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt39
21 files changed, 176 insertions, 224 deletions
diff --git a/simulator/opendc/opendc-compute/build.gradle.kts b/simulator/opendc/opendc-compute/build.gradle.kts
index 376c4269..0e44785e 100644
--- a/simulator/opendc/opendc-compute/build.gradle.kts
+++ b/simulator/opendc/opendc-compute/build.gradle.kts
@@ -34,7 +34,7 @@ dependencies {
api(project(":opendc:opendc-core"))
implementation("io.github.microutils:kotlin-logging:1.7.9")
- testRuntimeOnly(project(":odcsim:odcsim-engine-omega"))
+ testImplementation(project(":opendc:opendc-simulator"))
testRuntimeOnly("org.slf4j:slf4j-simple:${Library.SLF4J}")
testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}")
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index df45f440..bd266208 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -39,18 +39,10 @@ import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
-import kotlinx.coroutines.CancellationException
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.DisposableHandle
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.FlowPreview
-import kotlinx.coroutines.InternalCoroutinesApi
-import kotlinx.coroutines.Job
+import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.intrinsics.startCoroutineCancellable
-import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.SelectInstance
import kotlinx.coroutines.selects.select
@@ -100,6 +92,8 @@ class SimpleVirtDriver(
init {
launch {
try {
+ // Yield first to allow class variables to initialize
+ yield()
scheduler()
} catch (e: Exception) {
if (e !is CancellationException) {
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
index 80c9c547..7b57327e 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
@@ -24,53 +24,52 @@
package com.atlarge.opendc.compute.metal.driver
-import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
-import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.withContext
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
-import java.util.ServiceLoader
+import org.opendc.simulator.utils.DelayControllerClockAdapter
import java.util.UUID
+@OptIn(ExperimentalCoroutinesApi::class)
internal class SimpleBareMetalDriverTest {
/**
* A smoke test for the bare-metal driver.
*/
@Test
fun smoke() {
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
+
var finalState: ServerState = ServerState.BUILD
- val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider("sim")
- val root = system.newDomain(name = "root")
- root.launch {
- val dom = root.newDomain(name = "driver")
+ testScope.launch {
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4)
val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) }
- val driver = SimpleBareMetalDriver(dom, simulationContext.clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
+ val driver = SimpleBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 2)
// Batch driver commands
- withContext(dom.coroutineContext) {
+ withContext(coroutineContext) {
driver.init()
driver.setImage(image)
val server = driver.start().server!!
driver.usage
- .onEach { println("${simulationContext.clock.millis()} $it") }
+ .onEach { println("${clock.millis()} $it") }
.launchIn(this)
server.events.collect { event ->
when (event) {
is ServerEvent.StateChanged -> {
- println("${simulationContext.clock.millis()} $event")
+ println("${clock.millis()} $event")
finalState = event.server.state
}
}
@@ -78,11 +77,7 @@ internal class SimpleBareMetalDriverTest {
}
}
- runBlocking {
- system.run()
- system.terminate()
- }
-
+ testScope.advanceUntilIdle()
assertEquals(ServerState.SHUTOFF, finalState)
}
}
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
index 37cd5898..0a85e0f9 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
@@ -24,40 +24,38 @@
package com.atlarge.opendc.compute.metal.service
-import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
-import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.test.TestCoroutineScope
import org.junit.jupiter.api.Test
-import java.util.ServiceLoader
+import org.opendc.simulator.utils.DelayControllerClockAdapter
import java.util.UUID
/**
* Test suite for the [SimpleProvisioningService].
*/
+@OptIn(ExperimentalCoroutinesApi::class)
internal class SimpleProvisioningServiceTest {
/**
* A basic smoke test.
*/
@Test
fun smoke() {
- val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider("sim")
- val root = system.newDomain(name = "root")
- root.launch {
- val clock = simulationContext.clock
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
+
+ testScope.launch {
val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2)
- val dom = root.newDomain("provisioner")
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4)
val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) }
- val driver = SimpleBareMetalDriver(dom.newDomain(), clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
+ val driver = SimpleBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
val provisioner = SimpleProvisioningService()
provisioner.create(driver)
@@ -67,9 +65,6 @@ internal class SimpleProvisioningServiceTest {
node.server!!.events.collect { println(it) }
}
- runBlocking {
- system.run()
- system.terminate()
- }
+ testScope.advanceUntilIdle()
}
}
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
index 528434b1..dca0b292 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
@@ -24,8 +24,6 @@
package com.atlarge.opendc.compute.virt
-import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
@@ -39,17 +37,18 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
-import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.test.TestCoroutineScope
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import java.util.ServiceLoader
+import org.opendc.simulator.utils.DelayControllerClockAdapter
import java.util.UUID
/**
* Basic test-suite for the hypervisor.
*/
+@OptIn(ExperimentalCoroutinesApi::class)
internal class HypervisorTest {
/**
* A smoke test for the bare-metal driver.
@@ -58,21 +57,17 @@ internal class HypervisorTest {
@Test
@Disabled
fun smoke() {
- val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider("test")
- val root = system.newDomain("root")
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
- root.launch {
- val clock = simulationContext.clock
+ testScope.launch {
val vmm = HypervisorImage
val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 1)
val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000, 1)
- val driverDom = root.newDomain("driver")
-
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1)
val cpus = List(1) { ProcessingUnit(cpuNode, it, 2000.0) }
- val metalDriver = SimpleBareMetalDriver(driverDom, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
+ val metalDriver = SimpleBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
metalDriver.init()
metalDriver.setImage(vmm)
@@ -90,10 +85,7 @@ internal class HypervisorTest {
vmB.events.onEach { println(it) }.launchIn(this)
}
- runBlocking {
- system.run()
- system.terminate()
- }
+ testScope.advanceUntilIdle()
}
/**
@@ -101,16 +93,14 @@ internal class HypervisorTest {
*/
@Test
fun overcommission() {
- val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider("test")
- val root = system.newDomain("root")
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
var requestedBurst = 0L
var grantedBurst = 0L
var overcommissionedBurst = 0L
- root.launch {
- val clock = simulationContext.clock
+ testScope.launch {
val vmm = HypervisorImage
val duration = 5 * 60L
val vmImageA = VmImage(
@@ -140,11 +130,9 @@ internal class HypervisorTest {
0
)
- val driverDom = root.newDomain("driver")
-
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
val cpus = List(2) { ProcessingUnit(cpuNode, it, 3200.0) }
- val metalDriver = SimpleBareMetalDriver(driverDom, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
+ val metalDriver = SimpleBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
metalDriver.init()
metalDriver.setImage(vmm)
@@ -170,10 +158,7 @@ internal class HypervisorTest {
vmDriver.spawn("b", vmImageB, flavor)
}
- runBlocking {
- system.run()
- system.terminate()
- }
+ testScope.advanceUntilIdle()
assertAll(
{ assertEquals(2073600, requestedBurst, "Requested Burst does not match") },
diff --git a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt
index 1b896858..e96974f7 100644
--- a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt
+++ b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt
@@ -24,9 +24,9 @@
package com.atlarge.opendc.core.failure
-import com.atlarge.odcsim.simulationContext
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
+import java.time.Clock
import kotlin.math.ln1p
import kotlin.math.pow
import kotlin.random.Random
@@ -35,7 +35,12 @@ import kotlin.random.Random
* A [FaultInjector] that injects uncorrelated faults into the system, meaning that failures of the subsystems are
* independent.
*/
-public class UncorrelatedFaultInjector(private val alpha: Double, private val beta: Double, private val random: Random = Random(0)) : FaultInjector {
+class UncorrelatedFaultInjector(
+ private val clock: Clock,
+ private val alpha: Double,
+ private val beta: Double,
+ private val random: Random = Random(0)
+) : FaultInjector {
/**
* Enqueue the specified [FailureDomain] to fail some time in the future.
*/
@@ -44,7 +49,7 @@ public class UncorrelatedFaultInjector(private val alpha: Double, private val be
val d = random.weibull(alpha, beta) * 1e3 // Make sure to convert delay to milliseconds
// Handle long overflow
- if (simulationContext.clock.millis() + d <= 0) {
+ if (clock.millis() + d <= 0) {
return@launch
}
diff --git a/simulator/opendc/opendc-experiments-sc18/build.gradle.kts b/simulator/opendc/opendc-experiments-sc18/build.gradle.kts
index 2e366a43..5e01f387 100644
--- a/simulator/opendc/opendc-experiments-sc18/build.gradle.kts
+++ b/simulator/opendc/opendc-experiments-sc18/build.gradle.kts
@@ -38,9 +38,9 @@ dependencies {
api(project(":opendc:opendc-core"))
implementation(project(":opendc:opendc-format"))
implementation(project(":opendc:opendc-workflows"))
+ implementation(project(":opendc:opendc-simulator"))
implementation(kotlin("stdlib"))
- runtimeOnly(project(":odcsim:odcsim-engine-omega"))
testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}")
testImplementation("org.junit.platform:junit-platform-launcher:${Library.JUNIT_PLATFORM}")
diff --git a/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
index c7577824..0cece647 100644
--- a/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
+++ b/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
@@ -24,8 +24,6 @@
package com.atlarge.opendc.experiments.sc18
-import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader
import com.atlarge.opendc.format.trace.gwf.GwfTraceReader
@@ -38,20 +36,19 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec
import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
-import kotlinx.coroutines.async
+import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.test.TestCoroutineScope
+import org.opendc.simulator.utils.DelayControllerClockAdapter
import java.io.File
-import java.util.ServiceLoader
import kotlin.math.max
/**
* Main entry point of the experiment.
*/
+@OptIn(ExperimentalCoroutinesApi::class)
fun main(args: Array<String>) {
if (args.isEmpty()) {
println("error: Please provide path to GWF trace")
@@ -62,17 +59,16 @@ fun main(args: Array<String>) {
var finished = 0
val token = Channel<Boolean>()
- val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider(name = "sim")
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
- val schedulerDomain = system.newDomain(name = "scheduler")
- val schedulerAsync = schedulerDomain.async {
+ val schedulerAsync = testScope.async {
val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json"))
- .use { it.construct(schedulerDomain) }
+ .use { it.construct(this, clock) }
StageWorkflowService(
- schedulerDomain,
- simulationContext.clock,
+ this,
+ clock,
environment.platforms[0].zones[0].services[ProvisioningService],
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,
@@ -84,9 +80,7 @@ fun main(args: Array<String>) {
)
}
- val broker = system.newDomain(name = "broker")
-
- broker.launch {
+ testScope.launch {
val scheduler = schedulerAsync.await()
scheduler.events
.onEach { event ->
@@ -106,21 +100,18 @@ fun main(args: Array<String>) {
}
.collect()
}
- broker.launch {
- val ctx = simulationContext
+
+ testScope.launch {
val reader = GwfTraceReader(File(args[0]))
val scheduler = schedulerAsync.await()
while (reader.hasNext()) {
val (time, job) = reader.next()
total += 1
- delay(max(0, time * 1000 - ctx.clock.millis()))
+ delay(max(0, time * 1000 - clock.millis()))
scheduler.submit(job)
}
}
- runBlocking {
- system.run()
- system.terminate()
- }
+ testScope.advanceUntilIdle()
}
diff --git a/simulator/opendc/opendc-experiments-sc20/build.gradle.kts b/simulator/opendc/opendc-experiments-sc20/build.gradle.kts
index a66612e4..713354e7 100644
--- a/simulator/opendc/opendc-experiments-sc20/build.gradle.kts
+++ b/simulator/opendc/opendc-experiments-sc20/build.gradle.kts
@@ -38,6 +38,7 @@ application {
dependencies {
api(project(":opendc:opendc-core"))
implementation(project(":opendc:opendc-format"))
+ implementation(project(":opendc:opendc-simulator"))
implementation("com.github.ajalt:clikt:2.6.0")
implementation("me.tongfei:progressbar:0.8.1")
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
index 3765f307..b68ee97e 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
@@ -24,9 +24,6 @@
package com.atlarge.opendc.experiments.sc20.experiment
-import com.atlarge.odcsim.Domain
-import com.atlarge.odcsim.SimulationContext
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
@@ -46,17 +43,15 @@ import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor
import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.trace.TraceReader
-import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.takeWhile
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.withContext
import mu.KotlinLogging
import java.io.File
+import java.time.Clock
import kotlin.math.ln
import kotlin.math.max
import kotlin.random.Random
@@ -70,14 +65,14 @@ private val logger = KotlinLogging.logger {}
* Construct the failure domain for the experiments.
*/
suspend fun createFailureDomain(
+ coroutineScope: CoroutineScope,
+ clock: Clock,
seed: Int,
failureInterval: Double,
bareMetalProvisioner: ProvisioningService,
chan: Channel<Unit>
-): Domain {
- val root = simulationContext.domain
- val domain = root.newDomain(name = "failures")
- domain.launch {
+): CoroutineScope {
+ val job = coroutineScope.launch {
chan.receive()
val random = Random(seed)
val injectors = mutableMapOf<String, FaultInjector>()
@@ -86,7 +81,8 @@ suspend fun createFailureDomain(
val injector =
injectors.getOrPut(cluster) {
createFaultInjector(
- simulationContext,
+ this,
+ clock,
random,
failureInterval
)
@@ -94,18 +90,18 @@ suspend fun createFailureDomain(
injector.enqueue(node.metadata["driver"] as FailureDomain)
}
}
- return domain
+ return CoroutineScope(coroutineScope.coroutineContext + job)
}
/**
* Obtain the [FaultInjector] to use for the experiments.
*/
-fun createFaultInjector(simulationContext: SimulationContext, random: Random, failureInterval: Double): FaultInjector {
+fun createFaultInjector(coroutineScope: CoroutineScope, clock: Clock, random: Random, failureInterval: Double): FaultInjector {
// Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
// GRID'5000
return CorrelatedFaultInjector(
- simulationContext.domain,
- simulationContext.clock,
+ coroutineScope,
+ clock,
iatScale = ln(failureInterval), iatShape = 1.03, // Hours
sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1
dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes
@@ -129,31 +125,30 @@ fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInter
* Construct the environment for a VM provisioner and return the provisioner instance.
*/
suspend fun createProvisioner(
- root: Domain,
+ coroutineScope: CoroutineScope,
+ clock: Clock,
environmentReader: EnvironmentReader,
allocationPolicy: AllocationPolicy
-): Pair<ProvisioningService, SimpleVirtProvisioningService> = withContext(root.coroutineContext) {
- val environment = environmentReader.use { it.construct(root) }
+): Pair<ProvisioningService, SimpleVirtProvisioningService> {
+ val environment = environmentReader.use { it.construct(coroutineScope, clock) }
val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService]
// Wait for the bare metal nodes to be spawned
delay(10)
- val scheduler = SimpleVirtProvisioningService(simulationContext.domain, simulationContext.clock, bareMetalProvisioner, allocationPolicy)
+ val scheduler = SimpleVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy)
// Wait for the hypervisors to be spawned
delay(10)
- bareMetalProvisioner to scheduler
+ return bareMetalProvisioner to scheduler
}
/**
* Attach the specified monitor to the VM provisioner.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: ExperimentMonitor) {
- val domain = simulationContext.domain
- val clock = simulationContext.clock
+suspend fun attachMonitor(coroutineScope: CoroutineScope, clock: Clock, scheduler: SimpleVirtProvisioningService, monitor: ExperimentMonitor) {
val hypervisors = scheduler.drivers()
// Monitor hypervisor events
@@ -169,7 +164,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp
}
}
}
- .launchIn(domain)
+ .launchIn(coroutineScope)
hypervisor.events
.onEach { event ->
when (event) {
@@ -186,12 +181,12 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp
)
}
}
- .launchIn(domain)
+ .launchIn(coroutineScope)
val driver = hypervisor.server.services[BareMetalDriver.Key]
driver.powerDraw
.onEach { monitor.reportPowerConsumption(hypervisor.server, it) }
- .launchIn(domain)
+ .launchIn(coroutineScope)
}
scheduler.events
@@ -201,15 +196,13 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp
monitor.reportProvisionerMetrics(clock.millis(), event)
}
}
- .launchIn(domain)
+ .launchIn(coroutineScope)
}
/**
* Process the trace.
*/
-suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: ExperimentMonitor, vmPlacements: Map<String, String> = emptyMap()) {
- val domain = simulationContext.domain
-
+suspend fun processTrace(coroutineScope: CoroutineScope, clock: Clock, reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: ExperimentMonitor, vmPlacements: Map<String, String> = emptyMap()) {
try {
var submitted = 0
@@ -217,8 +210,8 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP
val (time, workload) = reader.next()
submitted++
- delay(max(0, time - simulationContext.clock.millis()))
- domain.launch {
+ delay(max(0, time - clock.millis()))
+ coroutineScope.launch {
chan.send(Unit)
val server = scheduler.deploy(
workload.image.name,
@@ -229,7 +222,7 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP
server.events
.onEach {
if (it is ServerEvent.StateChanged) {
- monitor.reportVmStateChange(simulationContext.clock.millis(), it.server)
+ monitor.reportVmStateChange(clock.millis(), it.server)
}
}
.collect()
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
index 7b42b095..76a10e56 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
@@ -24,7 +24,6 @@
package com.atlarge.opendc.experiments.sc20.experiment
-import com.atlarge.odcsim.SimulationEngineProvider
import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy
@@ -38,12 +37,14 @@ import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionC
import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
+import kotlinx.coroutines.test.TestCoroutineScope
import mu.KotlinLogging
+import org.opendc.simulator.utils.DelayControllerClockAdapter
import java.io.File
-import java.util.ServiceLoader
import kotlin.random.Random
/**
@@ -52,19 +53,15 @@ import kotlin.random.Random
private val logger = KotlinLogging.logger {}
/**
- * The provider for the simulation engine to use.
- */
-private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
-
-/**
* An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the
* same set of parameters.
*/
+@OptIn(ExperimentalCoroutinesApi::class)
public data class Run(override val parent: Scenario, val id: Int, val seed: Int) : TrialExperimentDescriptor() {
override suspend fun invoke(context: ExperimentExecutionContext) {
val experiment = parent.parent.parent
- val system = provider("experiment-$id")
- val root = system.newDomain("root")
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
val seeder = Random(seed)
val environment = Sc20ClusterEnvironmentReader(File(experiment.environments, "${parent.topology.name}.txt"))
@@ -112,9 +109,10 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int)
parent.parent.parent.bufferSize
)
- root.launch {
+ testScope.launch {
val (bareMetalProvisioner, scheduler) = createProvisioner(
- root,
+ this,
+ clock,
environment,
allocationPolicy
)
@@ -122,6 +120,8 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int)
val failureDomain = if (parent.operationalPhenomena.failureFrequency > 0) {
logger.debug("ENABLING failures")
createFailureDomain(
+ this,
+ clock,
seeder.nextInt(),
parent.operationalPhenomena.failureFrequency,
bareMetalProvisioner,
@@ -131,8 +131,10 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int)
null
}
- attachMonitor(scheduler, monitor)
+ attachMonitor(this, clock, scheduler, monitor)
processTrace(
+ this,
+ clock,
trace,
scheduler,
chan,
@@ -151,9 +153,8 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int)
}
try {
- system.run()
+ testScope.advanceUntilIdle()
} finally {
- system.terminate()
monitor.close()
}
}
diff --git a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
index 5ecf7605..ebee1543 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
@@ -24,9 +24,6 @@
package com.atlarge.opendc.experiments.sc20
-import com.atlarge.odcsim.Domain
-import com.atlarge.odcsim.SimulationEngine
-import com.atlarge.odcsim.SimulationEngineProvider
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
@@ -42,32 +39,35 @@ import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import com.atlarge.opendc.format.trace.TraceReader
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
-import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.test.TestCoroutineScope
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertAll
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
+import org.opendc.simulator.utils.DelayControllerClockAdapter
import java.io.File
-import java.util.ServiceLoader
+import java.time.Clock
/**
* An integration test suite for the SC20 experiments.
*/
+@OptIn(ExperimentalCoroutinesApi::class)
class Sc20IntegrationTest {
/**
- * The simulation engine to use.
+ * The [TestCoroutineScope] to use.
*/
- private lateinit var simulationEngine: SimulationEngine
+ private lateinit var testScope: TestCoroutineScope
/**
- * The root simulation domain to run in.
+ * The simulation clock to use.
*/
- private lateinit var root: Domain
+ private lateinit var clock: Clock
/**
* The monitor used to keep track of the metrics.
@@ -79,9 +79,9 @@ class Sc20IntegrationTest {
*/
@BeforeEach
fun setUp() {
- val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- simulationEngine = provider("test")
- root = simulationEngine.newDomain("root")
+ testScope = TestCoroutineScope()
+ clock = DelayControllerClockAdapter(testScope)
+
monitor = TestExperimentReporter()
}
@@ -89,9 +89,7 @@ class Sc20IntegrationTest {
* Tear down the experimental environment.
*/
@AfterEach
- fun tearDown() = runBlocking {
- simulationEngine.terminate()
- }
+ fun tearDown() = testScope.cleanupTestCoroutines()
@Test
fun smoke() {
@@ -103,9 +101,10 @@ class Sc20IntegrationTest {
val environmentReader = createTestEnvironmentReader()
lateinit var scheduler: SimpleVirtProvisioningService
- root.launch {
+ testScope.launch {
val res = createProvisioner(
- root,
+ this,
+ clock,
environmentReader,
allocationPolicy
)
@@ -115,6 +114,8 @@ class Sc20IntegrationTest {
val failureDomain = if (failures) {
println("ENABLING failures")
createFailureDomain(
+ this,
+ clock,
seed,
24.0 * 7,
bareMetalProvisioner,
@@ -124,8 +125,10 @@ class Sc20IntegrationTest {
null
}
- attachMonitor(scheduler, monitor)
+ attachMonitor(this, clock, scheduler, monitor)
processTrace(
+ this,
+ clock,
traceReader,
scheduler,
chan,
@@ -159,16 +162,19 @@ class Sc20IntegrationTest {
val environmentReader = createTestEnvironmentReader("single")
lateinit var scheduler: SimpleVirtProvisioningService
- root.launch {
+ testScope.launch {
val res = createProvisioner(
- root,
+ this,
+ clock,
environmentReader,
allocationPolicy
)
scheduler = res.second
- attachMonitor(scheduler, monitor)
+ attachMonitor(this, clock, scheduler, monitor)
processTrace(
+ this,
+ clock,
traceReader,
scheduler,
chan,
@@ -195,9 +201,7 @@ class Sc20IntegrationTest {
/**
* Run the simulation.
*/
- private fun runSimulation() = runBlocking {
- simulationEngine.run()
- }
+ private fun runSimulation() = testScope.advanceUntilIdle()
/**
* Obtain the trace reader for the test.
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt
index 4c4dcf37..570b936d 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt
@@ -27,6 +27,7 @@ package com.atlarge.opendc.format.environment
import com.atlarge.opendc.core.Environment
import kotlinx.coroutines.CoroutineScope
import java.io.Closeable
+import java.time.Clock
/**
* An interface for reading descriptions of topology environments into memory as [Environment].
@@ -35,5 +36,5 @@ interface EnvironmentReader : Closeable {
/**
* Construct an [Environment] in the specified [CoroutineScope].
*/
- suspend fun construct(coroutineScope: CoroutineScope): Environment
+ suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment
}
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
index 2b608aef..188d9fd8 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
@@ -24,7 +24,6 @@
package com.atlarge.opendc.format.environment.sc18
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
@@ -41,6 +40,7 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import kotlinx.coroutines.CoroutineScope
import java.io.InputStream
+import java.time.Clock
import java.util.UUID
/**
@@ -56,9 +56,7 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
*/
private val setup: Setup = mapper.readValue(input)
- override suspend fun construct(coroutineScope: CoroutineScope): Environment {
- val clock = simulationContext.clock
-
+ override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment {
var counter = 0
val nodes = setup.rooms.flatMap { room ->
room.objects.flatMap { roomObject ->
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
index 49118675..d7845081 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
@@ -24,7 +24,6 @@
package com.atlarge.opendc.format.environment.sc20
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
@@ -42,6 +41,7 @@ import kotlinx.coroutines.CoroutineScope
import java.io.File
import java.io.FileInputStream
import java.io.InputStream
+import java.time.Clock
import java.util.Random
import java.util.UUID
@@ -57,9 +57,7 @@ class Sc20ClusterEnvironmentReader(
constructor(file: File) : this(FileInputStream(file))
@Suppress("BlockingMethodInNonBlockingContext")
- override suspend fun construct(coroutineScope: CoroutineScope): Environment {
- val clock = simulationContext.clock
-
+ override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment {
var clusterIdCol = 0
var speedCol = 0
var numberOfHostsCol = 0
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
index f22f595f..adfa1cf0 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
@@ -24,7 +24,6 @@
package com.atlarge.opendc.format.environment.sc20
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
@@ -42,6 +41,7 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import kotlinx.coroutines.CoroutineScope
import java.io.InputStream
+import java.time.Clock
import java.util.UUID
/**
@@ -56,8 +56,7 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
*/
private val setup: Setup = mapper.readValue(input)
- override suspend fun construct(coroutineScope: CoroutineScope): Environment {
- val clock = simulationContext.clock
+ override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment {
var counter = 0
val nodes = setup.rooms.flatMap { room ->
room.objects.flatMap { roomObject ->
diff --git a/simulator/opendc/opendc-runner-web/build.gradle.kts b/simulator/opendc/opendc-runner-web/build.gradle.kts
index 6f725de1..1d263e75 100644
--- a/simulator/opendc/opendc-runner-web/build.gradle.kts
+++ b/simulator/opendc/opendc-runner-web/build.gradle.kts
@@ -39,6 +39,7 @@ dependencies {
implementation(project(":opendc:opendc-compute"))
implementation(project(":opendc:opendc-format"))
implementation(project(":opendc:opendc-experiments-sc20"))
+ implementation(project(":opendc:opendc-simulator"))
implementation("com.github.ajalt:clikt:2.8.0")
implementation("io.github.microutils:kotlin-logging:1.7.10")
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt
index 9cfe5531..ac4d9087 100644
--- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt
+++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt
@@ -1,6 +1,5 @@
package com.atlarge.opendc.runner.web
-import com.atlarge.odcsim.SimulationEngineProvider
import com.atlarge.opendc.compute.virt.service.allocation.*
import com.atlarge.opendc.experiments.sc20.experiment.attachMonitor
import com.atlarge.opendc.experiments.sc20.experiment.createFailureDomain
@@ -24,8 +23,10 @@ import com.mongodb.client.MongoDatabase
import com.mongodb.client.model.Filters
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.test.TestCoroutineScope
import mu.KotlinLogging
import org.bson.Document
+import org.opendc.simulator.utils.DelayControllerClockAdapter
import java.io.File
import java.util.*
import kotlin.random.Random
@@ -33,13 +34,9 @@ import kotlin.random.Random
private val logger = KotlinLogging.logger {}
/**
- * The provider for the simulation engine to use.
- */
-private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
-
-/**
* Represents the CLI command for starting the OpenDC web runner.
*/
+@OptIn(ExperimentalCoroutinesApi::class)
class RunnerCli : CliktCommand(name = "runner") {
/**
* The name of the database to use.
@@ -195,8 +192,8 @@ class RunnerCli : CliktCommand(name = "runner") {
val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble()
val seeder = Random(seed)
- val system = provider("experiment-$id")
- val root = system.newDomain("root")
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
val chan = Channel<Unit>(Channel.CONFLATED)
@@ -230,9 +227,10 @@ class RunnerCli : CliktCommand(name = "runner") {
4096
)
- root.launch {
+ testScope.launch {
val (bareMetalProvisioner, scheduler) = createProvisioner(
- root,
+ this,
+ clock,
environment,
allocationPolicy
)
@@ -240,6 +238,8 @@ class RunnerCli : CliktCommand(name = "runner") {
val failureDomain = if (operational.getBoolean("failuresEnabled")) {
logger.debug("ENABLING failures")
createFailureDomain(
+ testScope,
+ clock,
seeder.nextInt(),
operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7,
bareMetalProvisioner,
@@ -249,8 +249,10 @@ class RunnerCli : CliktCommand(name = "runner") {
null
}
- attachMonitor(scheduler, monitor)
+ attachMonitor(this, clock, scheduler, monitor)
processTrace(
+ this,
+ clock,
trace,
scheduler,
chan,
@@ -269,9 +271,8 @@ class RunnerCli : CliktCommand(name = "runner") {
}
try {
- system.run()
+ testScope.advanceUntilIdle()
} finally {
- system.terminate()
monitor.close()
}
}
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt
index ab683985..f9b1c6c4 100644
--- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt
+++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt
@@ -1,6 +1,5 @@
package com.atlarge.opendc.runner.web
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
@@ -23,6 +22,7 @@ import com.mongodb.client.model.Projections
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import org.bson.Document
+import java.time.Clock
import java.util.*
/**
@@ -32,8 +32,7 @@ class TopologyParser(private val collection: MongoCollection<Document>, private
/**
* Parse the topology with the specified [id].
*/
- override suspend fun construct(coroutineScope: CoroutineScope): Environment {
- val clock = simulationContext.clock
+ override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment {
val nodes = mutableListOf<SimpleBareMetalDriver>()
val random = Random(0)
diff --git a/simulator/opendc/opendc-workflows/build.gradle.kts b/simulator/opendc/opendc-workflows/build.gradle.kts
index 893c9020..62c4bc25 100644
--- a/simulator/opendc/opendc-workflows/build.gradle.kts
+++ b/simulator/opendc/opendc-workflows/build.gradle.kts
@@ -33,7 +33,7 @@ dependencies {
api(project(":opendc:opendc-core"))
api(project(":opendc:opendc-compute"))
- testRuntimeOnly(project(":odcsim:odcsim-engine-omega"))
+ testImplementation(project(":opendc:opendc-simulator"))
testImplementation(project(":opendc:opendc-format"))
testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}")
diff --git a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
index 655d8e1d..114003a3 100644
--- a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
+++ b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
@@ -24,8 +24,6 @@
package com.atlarge.opendc.workflows.service
-import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader
import com.atlarge.opendc.format.trace.gwf.GwfTraceReader
@@ -35,22 +33,22 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec
import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
-import kotlinx.coroutines.async
-import kotlinx.coroutines.delay
+import kotlinx.coroutines.*
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.test.TestCoroutineScope
import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertNotEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
-import java.util.ServiceLoader
+import org.opendc.simulator.utils.DelayControllerClockAdapter
import kotlin.math.max
/**
* Integration test suite for the [StageWorkflowService].
*/
@DisplayName("StageWorkflowService")
+@OptIn(ExperimentalCoroutinesApi::class)
internal class StageWorkflowSchedulerIntegrationTest {
/**
* A large integration test where we check whether all tasks in some trace are executed correctly.
@@ -63,17 +61,15 @@ internal class StageWorkflowSchedulerIntegrationTest {
var tasksStarted = 0L
var tasksFinished = 0L
- val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider(name = "sim")
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
- val schedulerDomain = system.newDomain(name = "scheduler")
- val schedulerAsync = schedulerDomain.async {
- val clock = simulationContext.clock
+ val schedulerAsync = testScope.async {
val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json"))
- .use { it.construct(schedulerDomain) }
+ .use { it.construct(testScope, clock) }
StageWorkflowService(
- schedulerDomain,
+ testScope,
clock,
environment.platforms[0].zones[0].services[ProvisioningService],
mode = WorkflowSchedulerMode.Batch(100),
@@ -86,9 +82,7 @@ internal class StageWorkflowSchedulerIntegrationTest {
)
}
- val broker = system.newDomain(name = "broker")
-
- broker.launch {
+ testScope.launch {
val scheduler = schedulerAsync.await()
scheduler.events
.onEach { event ->
@@ -102,24 +96,21 @@ internal class StageWorkflowSchedulerIntegrationTest {
.collect()
}
- broker.launch {
- val ctx = simulationContext
+ testScope.launch {
val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf"))
val scheduler = schedulerAsync.await()
while (reader.hasNext()) {
val (time, job) = reader.next()
jobsSubmitted++
- delay(max(0, time * 1000 - ctx.clock.millis()))
+ delay(max(0, time * 1000 - clock.millis()))
scheduler.submit(job)
}
}
- runBlocking {
- system.run()
- system.terminate()
- }
+ testScope.advanceUntilIdle()
+ assertNotEquals(0, jobsSubmitted, "No jobs submitted")
assertEquals(jobsSubmitted, jobsStarted, "Not all submitted jobs started")
assertEquals(jobsSubmitted, jobsFinished, "Not all started jobs finished")
assertEquals(tasksStarted, tasksFinished, "Not all started tasks finished")