summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt3
-rw-r--r--simulator/opendc/opendc-compute/build.gradle.kts2
-rw-r--r--simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt12
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt31
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt25
-rw-r--r--simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt41
-rw-r--r--simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt11
-rw-r--r--simulator/opendc/opendc-experiments-sc18/build.gradle.kts2
-rw-r--r--simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt39
-rw-r--r--simulator/opendc/opendc-experiments-sc20/build.gradle.kts1
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt61
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt29
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt52
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt3
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt6
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt6
-rw-r--r--simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt5
-rw-r--r--simulator/opendc/opendc-runner-web/build.gradle.kts1
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt27
-rw-r--r--simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt5
-rw-r--r--simulator/opendc/opendc-workflows/build.gradle.kts2
-rw-r--r--simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt39
22 files changed, 177 insertions, 226 deletions
diff --git a/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt b/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt
index 5d9af9ec..4918a535 100644
--- a/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt
+++ b/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt
@@ -32,7 +32,6 @@ import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.consumeAsFlow
-import java.util.WeakHashMap
/**
* A [Flow] that can be used to emit events.
@@ -61,7 +60,7 @@ public fun <T> EventFlow(): EventFlow<T> = EventFlowImpl()
@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
private class EventFlowImpl<T> : EventFlow<T> {
private var closed: Boolean = false
- private val subscribers = WeakHashMap<SendChannel<T>, Unit>()
+ private val subscribers = HashMap<SendChannel<T>, Unit>()
override fun emit(event: T) {
synchronized(this) {
diff --git a/simulator/opendc/opendc-compute/build.gradle.kts b/simulator/opendc/opendc-compute/build.gradle.kts
index 376c4269..0e44785e 100644
--- a/simulator/opendc/opendc-compute/build.gradle.kts
+++ b/simulator/opendc/opendc-compute/build.gradle.kts
@@ -34,7 +34,7 @@ dependencies {
api(project(":opendc:opendc-core"))
implementation("io.github.microutils:kotlin-logging:1.7.9")
- testRuntimeOnly(project(":odcsim:odcsim-engine-omega"))
+ testImplementation(project(":opendc:opendc-simulator"))
testRuntimeOnly("org.slf4j:slf4j-simple:${Library.SLF4J}")
testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}")
diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index df45f440..bd266208 100644
--- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -39,18 +39,10 @@ import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.core.services.ServiceKey
import com.atlarge.opendc.core.services.ServiceRegistry
-import kotlinx.coroutines.CancellationException
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.DisposableHandle
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.FlowPreview
-import kotlinx.coroutines.InternalCoroutinesApi
-import kotlinx.coroutines.Job
+import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.intrinsics.startCoroutineCancellable
-import kotlinx.coroutines.launch
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.SelectInstance
import kotlinx.coroutines.selects.select
@@ -100,6 +92,8 @@ class SimpleVirtDriver(
init {
launch {
try {
+ // Yield first to allow class variables to initialize
+ yield()
scheduler()
} catch (e: Exception) {
if (e !is CancellationException) {
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
index 80c9c547..7b57327e 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
@@ -24,53 +24,52 @@
package com.atlarge.opendc.compute.metal.driver
-import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
-import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.withContext
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
-import java.util.ServiceLoader
+import org.opendc.simulator.utils.DelayControllerClockAdapter
import java.util.UUID
+@OptIn(ExperimentalCoroutinesApi::class)
internal class SimpleBareMetalDriverTest {
/**
* A smoke test for the bare-metal driver.
*/
@Test
fun smoke() {
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
+
var finalState: ServerState = ServerState.BUILD
- val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider("sim")
- val root = system.newDomain(name = "root")
- root.launch {
- val dom = root.newDomain(name = "driver")
+ testScope.launch {
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4)
val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) }
- val driver = SimpleBareMetalDriver(dom, simulationContext.clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
+ val driver = SimpleBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 2)
// Batch driver commands
- withContext(dom.coroutineContext) {
+ withContext(coroutineContext) {
driver.init()
driver.setImage(image)
val server = driver.start().server!!
driver.usage
- .onEach { println("${simulationContext.clock.millis()} $it") }
+ .onEach { println("${clock.millis()} $it") }
.launchIn(this)
server.events.collect { event ->
when (event) {
is ServerEvent.StateChanged -> {
- println("${simulationContext.clock.millis()} $event")
+ println("${clock.millis()} $event")
finalState = event.server.state
}
}
@@ -78,11 +77,7 @@ internal class SimpleBareMetalDriverTest {
}
}
- runBlocking {
- system.run()
- system.terminate()
- }
-
+ testScope.advanceUntilIdle()
assertEquals(ServerState.SHUTOFF, finalState)
}
}
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
index 37cd5898..0a85e0f9 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt
@@ -24,40 +24,38 @@
package com.atlarge.opendc.compute.metal.service
-import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
-import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.test.TestCoroutineScope
import org.junit.jupiter.api.Test
-import java.util.ServiceLoader
+import org.opendc.simulator.utils.DelayControllerClockAdapter
import java.util.UUID
/**
* Test suite for the [SimpleProvisioningService].
*/
+@OptIn(ExperimentalCoroutinesApi::class)
internal class SimpleProvisioningServiceTest {
/**
* A basic smoke test.
*/
@Test
fun smoke() {
- val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider("sim")
- val root = system.newDomain(name = "root")
- root.launch {
- val clock = simulationContext.clock
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
+
+ testScope.launch {
val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2)
- val dom = root.newDomain("provisioner")
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4)
val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) }
- val driver = SimpleBareMetalDriver(dom.newDomain(), clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
+ val driver = SimpleBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
val provisioner = SimpleProvisioningService()
provisioner.create(driver)
@@ -67,9 +65,6 @@ internal class SimpleProvisioningServiceTest {
node.server!!.events.collect { println(it) }
}
- runBlocking {
- system.run()
- system.terminate()
- }
+ testScope.advanceUntilIdle()
}
}
diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
index 528434b1..dca0b292 100644
--- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
+++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt
@@ -24,8 +24,6 @@
package com.atlarge.opendc.compute.virt
-import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
@@ -39,17 +37,18 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
-import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.test.TestCoroutineScope
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
-import java.util.ServiceLoader
+import org.opendc.simulator.utils.DelayControllerClockAdapter
import java.util.UUID
/**
* Basic test-suite for the hypervisor.
*/
+@OptIn(ExperimentalCoroutinesApi::class)
internal class HypervisorTest {
/**
* A smoke test for the bare-metal driver.
@@ -58,21 +57,17 @@ internal class HypervisorTest {
@Test
@Disabled
fun smoke() {
- val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider("test")
- val root = system.newDomain("root")
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
- root.launch {
- val clock = simulationContext.clock
+ testScope.launch {
val vmm = HypervisorImage
val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 1)
val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000, 1)
- val driverDom = root.newDomain("driver")
-
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1)
val cpus = List(1) { ProcessingUnit(cpuNode, it, 2000.0) }
- val metalDriver = SimpleBareMetalDriver(driverDom, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
+ val metalDriver = SimpleBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
metalDriver.init()
metalDriver.setImage(vmm)
@@ -90,10 +85,7 @@ internal class HypervisorTest {
vmB.events.onEach { println(it) }.launchIn(this)
}
- runBlocking {
- system.run()
- system.terminate()
- }
+ testScope.advanceUntilIdle()
}
/**
@@ -101,16 +93,14 @@ internal class HypervisorTest {
*/
@Test
fun overcommission() {
- val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider("test")
- val root = system.newDomain("root")
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
var requestedBurst = 0L
var grantedBurst = 0L
var overcommissionedBurst = 0L
- root.launch {
- val clock = simulationContext.clock
+ testScope.launch {
val vmm = HypervisorImage
val duration = 5 * 60L
val vmImageA = VmImage(
@@ -140,11 +130,9 @@ internal class HypervisorTest {
0
)
- val driverDom = root.newDomain("driver")
-
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
val cpus = List(2) { ProcessingUnit(cpuNode, it, 3200.0) }
- val metalDriver = SimpleBareMetalDriver(driverDom, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
+ val metalDriver = SimpleBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList())
metalDriver.init()
metalDriver.setImage(vmm)
@@ -170,10 +158,7 @@ internal class HypervisorTest {
vmDriver.spawn("b", vmImageB, flavor)
}
- runBlocking {
- system.run()
- system.terminate()
- }
+ testScope.advanceUntilIdle()
assertAll(
{ assertEquals(2073600, requestedBurst, "Requested Burst does not match") },
diff --git a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt
index 1b896858..e96974f7 100644
--- a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt
+++ b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt
@@ -24,9 +24,9 @@
package com.atlarge.opendc.core.failure
-import com.atlarge.odcsim.simulationContext
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
+import java.time.Clock
import kotlin.math.ln1p
import kotlin.math.pow
import kotlin.random.Random
@@ -35,7 +35,12 @@ import kotlin.random.Random
* A [FaultInjector] that injects uncorrelated faults into the system, meaning that failures of the subsystems are
* independent.
*/
-public class UncorrelatedFaultInjector(private val alpha: Double, private val beta: Double, private val random: Random = Random(0)) : FaultInjector {
+class UncorrelatedFaultInjector(
+ private val clock: Clock,
+ private val alpha: Double,
+ private val beta: Double,
+ private val random: Random = Random(0)
+) : FaultInjector {
/**
* Enqueue the specified [FailureDomain] to fail some time in the future.
*/
@@ -44,7 +49,7 @@ public class UncorrelatedFaultInjector(private val alpha: Double, private val be
val d = random.weibull(alpha, beta) * 1e3 // Make sure to convert delay to milliseconds
// Handle long overflow
- if (simulationContext.clock.millis() + d <= 0) {
+ if (clock.millis() + d <= 0) {
return@launch
}
diff --git a/simulator/opendc/opendc-experiments-sc18/build.gradle.kts b/simulator/opendc/opendc-experiments-sc18/build.gradle.kts
index 2e366a43..5e01f387 100644
--- a/simulator/opendc/opendc-experiments-sc18/build.gradle.kts
+++ b/simulator/opendc/opendc-experiments-sc18/build.gradle.kts
@@ -38,9 +38,9 @@ dependencies {
api(project(":opendc:opendc-core"))
implementation(project(":opendc:opendc-format"))
implementation(project(":opendc:opendc-workflows"))
+ implementation(project(":opendc:opendc-simulator"))
implementation(kotlin("stdlib"))
- runtimeOnly(project(":odcsim:odcsim-engine-omega"))
testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}")
testImplementation("org.junit.platform:junit-platform-launcher:${Library.JUNIT_PLATFORM}")
diff --git a/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
index c7577824..0cece647 100644
--- a/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
+++ b/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt
@@ -24,8 +24,6 @@
package com.atlarge.opendc.experiments.sc18
-import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader
import com.atlarge.opendc.format.trace.gwf.GwfTraceReader
@@ -38,20 +36,19 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec
import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
-import kotlinx.coroutines.async
+import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.test.TestCoroutineScope
+import org.opendc.simulator.utils.DelayControllerClockAdapter
import java.io.File
-import java.util.ServiceLoader
import kotlin.math.max
/**
* Main entry point of the experiment.
*/
+@OptIn(ExperimentalCoroutinesApi::class)
fun main(args: Array<String>) {
if (args.isEmpty()) {
println("error: Please provide path to GWF trace")
@@ -62,17 +59,16 @@ fun main(args: Array<String>) {
var finished = 0
val token = Channel<Boolean>()
- val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider(name = "sim")
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
- val schedulerDomain = system.newDomain(name = "scheduler")
- val schedulerAsync = schedulerDomain.async {
+ val schedulerAsync = testScope.async {
val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json"))
- .use { it.construct(schedulerDomain) }
+ .use { it.construct(this, clock) }
StageWorkflowService(
- schedulerDomain,
- simulationContext.clock,
+ this,
+ clock,
environment.platforms[0].zones[0].services[ProvisioningService],
mode = WorkflowSchedulerMode.Batch(100),
jobAdmissionPolicy = NullJobAdmissionPolicy,
@@ -84,9 +80,7 @@ fun main(args: Array<String>) {
)
}
- val broker = system.newDomain(name = "broker")
-
- broker.launch {
+ testScope.launch {
val scheduler = schedulerAsync.await()
scheduler.events
.onEach { event ->
@@ -106,21 +100,18 @@ fun main(args: Array<String>) {
}
.collect()
}
- broker.launch {
- val ctx = simulationContext
+
+ testScope.launch {
val reader = GwfTraceReader(File(args[0]))
val scheduler = schedulerAsync.await()
while (reader.hasNext()) {
val (time, job) = reader.next()
total += 1
- delay(max(0, time * 1000 - ctx.clock.millis()))
+ delay(max(0, time * 1000 - clock.millis()))
scheduler.submit(job)
}
}
- runBlocking {
- system.run()
- system.terminate()
- }
+ testScope.advanceUntilIdle()
}
diff --git a/simulator/opendc/opendc-experiments-sc20/build.gradle.kts b/simulator/opendc/opendc-experiments-sc20/build.gradle.kts
index a66612e4..713354e7 100644
--- a/simulator/opendc/opendc-experiments-sc20/build.gradle.kts
+++ b/simulator/opendc/opendc-experiments-sc20/build.gradle.kts
@@ -38,6 +38,7 @@ application {
dependencies {
api(project(":opendc:opendc-core"))
implementation(project(":opendc:opendc-format"))
+ implementation(project(":opendc:opendc-simulator"))
implementation("com.github.ajalt:clikt:2.6.0")
implementation("me.tongfei:progressbar:0.8.1")
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
index 3765f307..b68ee97e 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
@@ -24,9 +24,6 @@
package com.atlarge.opendc.experiments.sc20.experiment
-import com.atlarge.odcsim.Domain
-import com.atlarge.odcsim.SimulationContext
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
@@ -46,17 +43,15 @@ import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor
import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.trace.TraceReader
-import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.takeWhile
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.withContext
import mu.KotlinLogging
import java.io.File
+import java.time.Clock
import kotlin.math.ln
import kotlin.math.max
import kotlin.random.Random
@@ -70,14 +65,14 @@ private val logger = KotlinLogging.logger {}
* Construct the failure domain for the experiments.
*/
suspend fun createFailureDomain(
+ coroutineScope: CoroutineScope,
+ clock: Clock,
seed: Int,
failureInterval: Double,
bareMetalProvisioner: ProvisioningService,
chan: Channel<Unit>
-): Domain {
- val root = simulationContext.domain
- val domain = root.newDomain(name = "failures")
- domain.launch {
+): CoroutineScope {
+ val job = coroutineScope.launch {
chan.receive()
val random = Random(seed)
val injectors = mutableMapOf<String, FaultInjector>()
@@ -86,7 +81,8 @@ suspend fun createFailureDomain(
val injector =
injectors.getOrPut(cluster) {
createFaultInjector(
- simulationContext,
+ this,
+ clock,
random,
failureInterval
)
@@ -94,18 +90,18 @@ suspend fun createFailureDomain(
injector.enqueue(node.metadata["driver"] as FailureDomain)
}
}
- return domain
+ return CoroutineScope(coroutineScope.coroutineContext + job)
}
/**
* Obtain the [FaultInjector] to use for the experiments.
*/
-fun createFaultInjector(simulationContext: SimulationContext, random: Random, failureInterval: Double): FaultInjector {
+fun createFaultInjector(coroutineScope: CoroutineScope, clock: Clock, random: Random, failureInterval: Double): FaultInjector {
// Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
// GRID'5000
return CorrelatedFaultInjector(
- simulationContext.domain,
- simulationContext.clock,
+ coroutineScope,
+ clock,
iatScale = ln(failureInterval), iatShape = 1.03, // Hours
sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1
dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes
@@ -129,31 +125,30 @@ fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInter
* Construct the environment for a VM provisioner and return the provisioner instance.
*/
suspend fun createProvisioner(
- root: Domain,
+ coroutineScope: CoroutineScope,
+ clock: Clock,
environmentReader: EnvironmentReader,
allocationPolicy: AllocationPolicy
-): Pair<ProvisioningService, SimpleVirtProvisioningService> = withContext(root.coroutineContext) {
- val environment = environmentReader.use { it.construct(root) }
+): Pair<ProvisioningService, SimpleVirtProvisioningService> {
+ val environment = environmentReader.use { it.construct(coroutineScope, clock) }
val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService]
// Wait for the bare metal nodes to be spawned
delay(10)
- val scheduler = SimpleVirtProvisioningService(simulationContext.domain, simulationContext.clock, bareMetalProvisioner, allocationPolicy)
+ val scheduler = SimpleVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy)
// Wait for the hypervisors to be spawned
delay(10)
- bareMetalProvisioner to scheduler
+ return bareMetalProvisioner to scheduler
}
/**
* Attach the specified monitor to the VM provisioner.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: ExperimentMonitor) {
- val domain = simulationContext.domain
- val clock = simulationContext.clock
+suspend fun attachMonitor(coroutineScope: CoroutineScope, clock: Clock, scheduler: SimpleVirtProvisioningService, monitor: ExperimentMonitor) {
val hypervisors = scheduler.drivers()
// Monitor hypervisor events
@@ -169,7 +164,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp
}
}
}
- .launchIn(domain)
+ .launchIn(coroutineScope)
hypervisor.events
.onEach { event ->
when (event) {
@@ -186,12 +181,12 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp
)
}
}
- .launchIn(domain)
+ .launchIn(coroutineScope)
val driver = hypervisor.server.services[BareMetalDriver.Key]
driver.powerDraw
.onEach { monitor.reportPowerConsumption(hypervisor.server, it) }
- .launchIn(domain)
+ .launchIn(coroutineScope)
}
scheduler.events
@@ -201,15 +196,13 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp
monitor.reportProvisionerMetrics(clock.millis(), event)
}
}
- .launchIn(domain)
+ .launchIn(coroutineScope)
}
/**
* Process the trace.
*/
-suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: ExperimentMonitor, vmPlacements: Map<String, String> = emptyMap()) {
- val domain = simulationContext.domain
-
+suspend fun processTrace(coroutineScope: CoroutineScope, clock: Clock, reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: ExperimentMonitor, vmPlacements: Map<String, String> = emptyMap()) {
try {
var submitted = 0
@@ -217,8 +210,8 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP
val (time, workload) = reader.next()
submitted++
- delay(max(0, time - simulationContext.clock.millis()))
- domain.launch {
+ delay(max(0, time - clock.millis()))
+ coroutineScope.launch {
chan.send(Unit)
val server = scheduler.deploy(
workload.image.name,
@@ -229,7 +222,7 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP
server.events
.onEach {
if (it is ServerEvent.StateChanged) {
- monitor.reportVmStateChange(simulationContext.clock.millis(), it.server)
+ monitor.reportVmStateChange(clock.millis(), it.server)
}
}
.collect()
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
index 7b42b095..76a10e56 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
@@ -24,7 +24,6 @@
package com.atlarge.opendc.experiments.sc20.experiment
-import com.atlarge.odcsim.SimulationEngineProvider
import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy
@@ -38,12 +37,14 @@ import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionC
import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
+import kotlinx.coroutines.test.TestCoroutineScope
import mu.KotlinLogging
+import org.opendc.simulator.utils.DelayControllerClockAdapter
import java.io.File
-import java.util.ServiceLoader
import kotlin.random.Random
/**
@@ -52,19 +53,15 @@ import kotlin.random.Random
private val logger = KotlinLogging.logger {}
/**
- * The provider for the simulation engine to use.
- */
-private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
-
-/**
* An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the
* same set of parameters.
*/
+@OptIn(ExperimentalCoroutinesApi::class)
public data class Run(override val parent: Scenario, val id: Int, val seed: Int) : TrialExperimentDescriptor() {
override suspend fun invoke(context: ExperimentExecutionContext) {
val experiment = parent.parent.parent
- val system = provider("experiment-$id")
- val root = system.newDomain("root")
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
val seeder = Random(seed)
val environment = Sc20ClusterEnvironmentReader(File(experiment.environments, "${parent.topology.name}.txt"))
@@ -112,9 +109,10 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int)
parent.parent.parent.bufferSize
)
- root.launch {
+ testScope.launch {
val (bareMetalProvisioner, scheduler) = createProvisioner(
- root,
+ this,
+ clock,
environment,
allocationPolicy
)
@@ -122,6 +120,8 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int)
val failureDomain = if (parent.operationalPhenomena.failureFrequency > 0) {
logger.debug("ENABLING failures")
createFailureDomain(
+ this,
+ clock,
seeder.nextInt(),
parent.operationalPhenomena.failureFrequency,
bareMetalProvisioner,
@@ -131,8 +131,10 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int)
null
}
- attachMonitor(scheduler, monitor)
+ attachMonitor(this, clock, scheduler, monitor)
processTrace(
+ this,
+ clock,
trace,
scheduler,
chan,
@@ -151,9 +153,8 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int)
}
try {
- system.run()
+ testScope.advanceUntilIdle()
} finally {
- system.terminate()
monitor.close()
}
}
diff --git a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
index 5ecf7605..ebee1543 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
@@ -24,9 +24,6 @@
package com.atlarge.opendc.experiments.sc20
-import com.atlarge.odcsim.Domain
-import com.atlarge.odcsim.SimulationEngine
-import com.atlarge.odcsim.SimulationEngineProvider
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
@@ -42,32 +39,35 @@ import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import com.atlarge.opendc.format.trace.TraceReader
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
-import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.test.TestCoroutineScope
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertAll
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
+import org.opendc.simulator.utils.DelayControllerClockAdapter
import java.io.File
-import java.util.ServiceLoader
+import java.time.Clock
/**
* An integration test suite for the SC20 experiments.
*/
+@OptIn(ExperimentalCoroutinesApi::class)
class Sc20IntegrationTest {
/**
- * The simulation engine to use.
+ * The [TestCoroutineScope] to use.
*/
- private lateinit var simulationEngine: SimulationEngine
+ private lateinit var testScope: TestCoroutineScope
/**
- * The root simulation domain to run in.
+ * The simulation clock to use.
*/
- private lateinit var root: Domain
+ private lateinit var clock: Clock
/**
* The monitor used to keep track of the metrics.
@@ -79,9 +79,9 @@ class Sc20IntegrationTest {
*/
@BeforeEach
fun setUp() {
- val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- simulationEngine = provider("test")
- root = simulationEngine.newDomain("root")
+ testScope = TestCoroutineScope()
+ clock = DelayControllerClockAdapter(testScope)
+
monitor = TestExperimentReporter()
}
@@ -89,9 +89,7 @@ class Sc20IntegrationTest {
* Tear down the experimental environment.
*/
@AfterEach
- fun tearDown() = runBlocking {
- simulationEngine.terminate()
- }
+ fun tearDown() = testScope.cleanupTestCoroutines()
@Test
fun smoke() {
@@ -103,9 +101,10 @@ class Sc20IntegrationTest {
val environmentReader = createTestEnvironmentReader()
lateinit var scheduler: SimpleVirtProvisioningService
- root.launch {
+ testScope.launch {
val res = createProvisioner(
- root,
+ this,
+ clock,
environmentReader,
allocationPolicy
)
@@ -115,6 +114,8 @@ class Sc20IntegrationTest {
val failureDomain = if (failures) {
println("ENABLING failures")
createFailureDomain(
+ this,
+ clock,
seed,
24.0 * 7,
bareMetalProvisioner,
@@ -124,8 +125,10 @@ class Sc20IntegrationTest {
null
}
- attachMonitor(scheduler, monitor)
+ attachMonitor(this, clock, scheduler, monitor)
processTrace(
+ this,
+ clock,
traceReader,
scheduler,
chan,
@@ -159,16 +162,19 @@ class Sc20IntegrationTest {
val environmentReader = createTestEnvironmentReader("single")
lateinit var scheduler: SimpleVirtProvisioningService
- root.launch {
+ testScope.launch {
val res = createProvisioner(
- root,
+ this,
+ clock,
environmentReader,
allocationPolicy
)
scheduler = res.second
- attachMonitor(scheduler, monitor)
+ attachMonitor(this, clock, scheduler, monitor)
processTrace(
+ this,
+ clock,
traceReader,
scheduler,
chan,
@@ -195,9 +201,7 @@ class Sc20IntegrationTest {
/**
* Run the simulation.
*/
- private fun runSimulation() = runBlocking {
- simulationEngine.run()
- }
+ private fun runSimulation() = testScope.advanceUntilIdle()
/**
* Obtain the trace reader for the test.
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt
index 4c4dcf37..570b936d 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt
@@ -27,6 +27,7 @@ package com.atlarge.opendc.format.environment
import com.atlarge.opendc.core.Environment
import kotlinx.coroutines.CoroutineScope
import java.io.Closeable
+import java.time.Clock
/**
* An interface for reading descriptions of topology environments into memory as [Environment].
@@ -35,5 +36,5 @@ interface EnvironmentReader : Closeable {
/**
* Construct an [Environment] in the specified [CoroutineScope].
*/
- suspend fun construct(coroutineScope: CoroutineScope): Environment
+ suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment
}
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
index 2b608aef..188d9fd8 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt
@@ -24,7 +24,6 @@
package com.atlarge.opendc.format.environment.sc18
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
@@ -41,6 +40,7 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import kotlinx.coroutines.CoroutineScope
import java.io.InputStream
+import java.time.Clock
import java.util.UUID
/**
@@ -56,9 +56,7 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
*/
private val setup: Setup = mapper.readValue(input)
- override suspend fun construct(coroutineScope: CoroutineScope): Environment {
- val clock = simulationContext.clock
-
+ override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment {
var counter = 0
val nodes = setup.rooms.flatMap { room ->
room.objects.flatMap { roomObject ->
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
index 49118675..d7845081 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt
@@ -24,7 +24,6 @@
package com.atlarge.opendc.format.environment.sc20
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
@@ -42,6 +41,7 @@ import kotlinx.coroutines.CoroutineScope
import java.io.File
import java.io.FileInputStream
import java.io.InputStream
+import java.time.Clock
import java.util.Random
import java.util.UUID
@@ -57,9 +57,7 @@ class Sc20ClusterEnvironmentReader(
constructor(file: File) : this(FileInputStream(file))
@Suppress("BlockingMethodInNonBlockingContext")
- override suspend fun construct(coroutineScope: CoroutineScope): Environment {
- val clock = simulationContext.clock
-
+ override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment {
var clusterIdCol = 0
var speedCol = 0
var numberOfHostsCol = 0
diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
index f22f595f..adfa1cf0 100644
--- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
+++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt
@@ -24,7 +24,6 @@
package com.atlarge.opendc.format.environment.sc20
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
@@ -42,6 +41,7 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import kotlinx.coroutines.CoroutineScope
import java.io.InputStream
+import java.time.Clock
import java.util.UUID
/**
@@ -56,8 +56,7 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb
*/
private val setup: Setup = mapper.readValue(input)
- override suspend fun construct(coroutineScope: CoroutineScope): Environment {
- val clock = simulationContext.clock
+ override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment {
var counter = 0
val nodes = setup.rooms.flatMap { room ->
room.objects.flatMap { roomObject ->
diff --git a/simulator/opendc/opendc-runner-web/build.gradle.kts b/simulator/opendc/opendc-runner-web/build.gradle.kts
index 6f725de1..1d263e75 100644
--- a/simulator/opendc/opendc-runner-web/build.gradle.kts
+++ b/simulator/opendc/opendc-runner-web/build.gradle.kts
@@ -39,6 +39,7 @@ dependencies {
implementation(project(":opendc:opendc-compute"))
implementation(project(":opendc:opendc-format"))
implementation(project(":opendc:opendc-experiments-sc20"))
+ implementation(project(":opendc:opendc-simulator"))
implementation("com.github.ajalt:clikt:2.8.0")
implementation("io.github.microutils:kotlin-logging:1.7.10")
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt
index 9cfe5531..ac4d9087 100644
--- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt
+++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt
@@ -1,6 +1,5 @@
package com.atlarge.opendc.runner.web
-import com.atlarge.odcsim.SimulationEngineProvider
import com.atlarge.opendc.compute.virt.service.allocation.*
import com.atlarge.opendc.experiments.sc20.experiment.attachMonitor
import com.atlarge.opendc.experiments.sc20.experiment.createFailureDomain
@@ -24,8 +23,10 @@ import com.mongodb.client.MongoDatabase
import com.mongodb.client.model.Filters
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.test.TestCoroutineScope
import mu.KotlinLogging
import org.bson.Document
+import org.opendc.simulator.utils.DelayControllerClockAdapter
import java.io.File
import java.util.*
import kotlin.random.Random
@@ -33,13 +34,9 @@ import kotlin.random.Random
private val logger = KotlinLogging.logger {}
/**
- * The provider for the simulation engine to use.
- */
-private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
-
-/**
* Represents the CLI command for starting the OpenDC web runner.
*/
+@OptIn(ExperimentalCoroutinesApi::class)
class RunnerCli : CliktCommand(name = "runner") {
/**
* The name of the database to use.
@@ -195,8 +192,8 @@ class RunnerCli : CliktCommand(name = "runner") {
val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble()
val seeder = Random(seed)
- val system = provider("experiment-$id")
- val root = system.newDomain("root")
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
val chan = Channel<Unit>(Channel.CONFLATED)
@@ -230,9 +227,10 @@ class RunnerCli : CliktCommand(name = "runner") {
4096
)
- root.launch {
+ testScope.launch {
val (bareMetalProvisioner, scheduler) = createProvisioner(
- root,
+ this,
+ clock,
environment,
allocationPolicy
)
@@ -240,6 +238,8 @@ class RunnerCli : CliktCommand(name = "runner") {
val failureDomain = if (operational.getBoolean("failuresEnabled")) {
logger.debug("ENABLING failures")
createFailureDomain(
+ testScope,
+ clock,
seeder.nextInt(),
operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7,
bareMetalProvisioner,
@@ -249,8 +249,10 @@ class RunnerCli : CliktCommand(name = "runner") {
null
}
- attachMonitor(scheduler, monitor)
+ attachMonitor(this, clock, scheduler, monitor)
processTrace(
+ this,
+ clock,
trace,
scheduler,
chan,
@@ -269,9 +271,8 @@ class RunnerCli : CliktCommand(name = "runner") {
}
try {
- system.run()
+ testScope.advanceUntilIdle()
} finally {
- system.terminate()
monitor.close()
}
}
diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt
index ab683985..f9b1c6c4 100644
--- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt
+++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt
@@ -1,6 +1,5 @@
package com.atlarge.opendc.runner.web
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.MemoryUnit
import com.atlarge.opendc.compute.core.ProcessingNode
import com.atlarge.opendc.compute.core.ProcessingUnit
@@ -23,6 +22,7 @@ import com.mongodb.client.model.Projections
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import org.bson.Document
+import java.time.Clock
import java.util.*
/**
@@ -32,8 +32,7 @@ class TopologyParser(private val collection: MongoCollection<Document>, private
/**
* Parse the topology with the specified [id].
*/
- override suspend fun construct(coroutineScope: CoroutineScope): Environment {
- val clock = simulationContext.clock
+ override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment {
val nodes = mutableListOf<SimpleBareMetalDriver>()
val random = Random(0)
diff --git a/simulator/opendc/opendc-workflows/build.gradle.kts b/simulator/opendc/opendc-workflows/build.gradle.kts
index 893c9020..62c4bc25 100644
--- a/simulator/opendc/opendc-workflows/build.gradle.kts
+++ b/simulator/opendc/opendc-workflows/build.gradle.kts
@@ -33,7 +33,7 @@ dependencies {
api(project(":opendc:opendc-core"))
api(project(":opendc:opendc-compute"))
- testRuntimeOnly(project(":odcsim:odcsim-engine-omega"))
+ testImplementation(project(":opendc:opendc-simulator"))
testImplementation(project(":opendc:opendc-format"))
testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}")
diff --git a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
index 655d8e1d..114003a3 100644
--- a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
+++ b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt
@@ -24,8 +24,6 @@
package com.atlarge.opendc.workflows.service
-import com.atlarge.odcsim.SimulationEngineProvider
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.metal.service.ProvisioningService
import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader
import com.atlarge.opendc.format.trace.gwf.GwfTraceReader
@@ -35,22 +33,22 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec
import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy
import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy
import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy
-import kotlinx.coroutines.async
-import kotlinx.coroutines.delay
+import kotlinx.coroutines.*
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.test.TestCoroutineScope
import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertNotEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
-import java.util.ServiceLoader
+import org.opendc.simulator.utils.DelayControllerClockAdapter
import kotlin.math.max
/**
* Integration test suite for the [StageWorkflowService].
*/
@DisplayName("StageWorkflowService")
+@OptIn(ExperimentalCoroutinesApi::class)
internal class StageWorkflowSchedulerIntegrationTest {
/**
* A large integration test where we check whether all tasks in some trace are executed correctly.
@@ -63,17 +61,15 @@ internal class StageWorkflowSchedulerIntegrationTest {
var tasksStarted = 0L
var tasksFinished = 0L
- val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- val system = provider(name = "sim")
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
- val schedulerDomain = system.newDomain(name = "scheduler")
- val schedulerAsync = schedulerDomain.async {
- val clock = simulationContext.clock
+ val schedulerAsync = testScope.async {
val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json"))
- .use { it.construct(schedulerDomain) }
+ .use { it.construct(testScope, clock) }
StageWorkflowService(
- schedulerDomain,
+ testScope,
clock,
environment.platforms[0].zones[0].services[ProvisioningService],
mode = WorkflowSchedulerMode.Batch(100),
@@ -86,9 +82,7 @@ internal class StageWorkflowSchedulerIntegrationTest {
)
}
- val broker = system.newDomain(name = "broker")
-
- broker.launch {
+ testScope.launch {
val scheduler = schedulerAsync.await()
scheduler.events
.onEach { event ->
@@ -102,24 +96,21 @@ internal class StageWorkflowSchedulerIntegrationTest {
.collect()
}
- broker.launch {
- val ctx = simulationContext
+ testScope.launch {
val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf"))
val scheduler = schedulerAsync.await()
while (reader.hasNext()) {
val (time, job) = reader.next()
jobsSubmitted++
- delay(max(0, time * 1000 - ctx.clock.millis()))
+ delay(max(0, time * 1000 - clock.millis()))
scheduler.submit(job)
}
}
- runBlocking {
- system.run()
- system.terminate()
- }
+ testScope.advanceUntilIdle()
+ assertNotEquals(0, jobsSubmitted, "No jobs submitted")
assertEquals(jobsSubmitted, jobsStarted, "Not all submitted jobs started")
assertEquals(jobsSubmitted, jobsFinished, "Not all started jobs finished")
assertEquals(tasksStarted, tasksFinished, "Not all started tasks finished")