diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-09-30 23:56:07 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-09-30 23:56:07 +0200 |
| commit | fcae560208df4860bc7461f955bf3b522b0e61c5 (patch) | |
| tree | 933f47f1061274a6a7e648da82c13f08fce41ea5 /simulator | |
| parent | 1766888d6dde44f96508a4bc6878978ddcaa073d (diff) | |
Migrate from Domain to TestCoroutineScope
This change eliminates the use of Domain and simulationContext in favour
of the generic (Test)CoroutineScope and Clock classes. In this way, we
decouple the OpenDC modules and their logic from simulation-related
code.
In this way, we also simplify eventual attempt for emulating OpenDC
componments in real-time.
Diffstat (limited to 'simulator')
22 files changed, 177 insertions, 226 deletions
diff --git a/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt b/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt index 5d9af9ec..4918a535 100644 --- a/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt +++ b/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt @@ -32,7 +32,6 @@ import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.FlowCollector import kotlinx.coroutines.flow.consumeAsFlow -import java.util.WeakHashMap /** * A [Flow] that can be used to emit events. @@ -61,7 +60,7 @@ public fun <T> EventFlow(): EventFlow<T> = EventFlowImpl() @OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) private class EventFlowImpl<T> : EventFlow<T> { private var closed: Boolean = false - private val subscribers = WeakHashMap<SendChannel<T>, Unit>() + private val subscribers = HashMap<SendChannel<T>, Unit>() override fun emit(event: T) { synchronized(this) { diff --git a/simulator/opendc/opendc-compute/build.gradle.kts b/simulator/opendc/opendc-compute/build.gradle.kts index 376c4269..0e44785e 100644 --- a/simulator/opendc/opendc-compute/build.gradle.kts +++ b/simulator/opendc/opendc-compute/build.gradle.kts @@ -34,7 +34,7 @@ dependencies { api(project(":opendc:opendc-core")) implementation("io.github.microutils:kotlin-logging:1.7.9") - testRuntimeOnly(project(":odcsim:odcsim-engine-omega")) + testImplementation(project(":opendc:opendc-simulator")) testRuntimeOnly("org.slf4j:slf4j-simple:${Library.SLF4J}") testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}") diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index df45f440..bd266208 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -39,18 +39,10 @@ import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.DisposableHandle -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.FlowPreview -import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.Job +import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.intrinsics.startCoroutineCancellable -import kotlinx.coroutines.launch import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.SelectInstance import kotlinx.coroutines.selects.select @@ -100,6 +92,8 @@ class SimpleVirtDriver( init { launch { try { + // Yield first to allow class variables to initialize + yield() scheduler() } catch (e: Exception) { if (e !is CancellationException) { diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt index 80c9c547..7b57327e 100644 --- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt +++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt @@ -24,53 +24,52 @@ package com.atlarge.opendc.compute.metal.driver -import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.FlopsApplicationImage +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.TestCoroutineScope import kotlinx.coroutines.withContext import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test -import java.util.ServiceLoader +import org.opendc.simulator.utils.DelayControllerClockAdapter import java.util.UUID +@OptIn(ExperimentalCoroutinesApi::class) internal class SimpleBareMetalDriverTest { /** * A smoke test for the bare-metal driver. */ @Test fun smoke() { + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) + var finalState: ServerState = ServerState.BUILD - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider("sim") - val root = system.newDomain(name = "root") - root.launch { - val dom = root.newDomain(name = "driver") + testScope.launch { val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) } - val driver = SimpleBareMetalDriver(dom, simulationContext.clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) + val driver = SimpleBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 2) // Batch driver commands - withContext(dom.coroutineContext) { + withContext(coroutineContext) { driver.init() driver.setImage(image) val server = driver.start().server!! driver.usage - .onEach { println("${simulationContext.clock.millis()} $it") } + .onEach { println("${clock.millis()} $it") } .launchIn(this) server.events.collect { event -> when (event) { is ServerEvent.StateChanged -> { - println("${simulationContext.clock.millis()} $event") + println("${clock.millis()} $event") finalState = event.server.state } } @@ -78,11 +77,7 @@ internal class SimpleBareMetalDriverTest { } } - runBlocking { - system.run() - system.terminate() - } - + testScope.advanceUntilIdle() assertEquals(ServerState.SHUTOFF, finalState) } } diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt index 37cd5898..0a85e0f9 100644 --- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt +++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt @@ -24,40 +24,38 @@ package com.atlarge.opendc.compute.metal.service -import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.image.FlopsApplicationImage import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.TestCoroutineScope import org.junit.jupiter.api.Test -import java.util.ServiceLoader +import org.opendc.simulator.utils.DelayControllerClockAdapter import java.util.UUID /** * Test suite for the [SimpleProvisioningService]. */ +@OptIn(ExperimentalCoroutinesApi::class) internal class SimpleProvisioningServiceTest { /** * A basic smoke test. */ @Test fun smoke() { - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider("sim") - val root = system.newDomain(name = "root") - root.launch { - val clock = simulationContext.clock + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) + + testScope.launch { val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2) - val dom = root.newDomain("provisioner") val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) } - val driver = SimpleBareMetalDriver(dom.newDomain(), clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) + val driver = SimpleBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) val provisioner = SimpleProvisioningService() provisioner.create(driver) @@ -67,9 +65,6 @@ internal class SimpleProvisioningServiceTest { node.server!!.events.collect { println(it) } } - runBlocking { - system.run() - system.terminate() - } + testScope.advanceUntilIdle() } } diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt index 528434b1..dca0b292 100644 --- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt +++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt @@ -24,8 +24,6 @@ package com.atlarge.opendc.compute.virt -import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit @@ -39,17 +37,18 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.TestCoroutineScope import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import java.util.ServiceLoader +import org.opendc.simulator.utils.DelayControllerClockAdapter import java.util.UUID /** * Basic test-suite for the hypervisor. */ +@OptIn(ExperimentalCoroutinesApi::class) internal class HypervisorTest { /** * A smoke test for the bare-metal driver. @@ -58,21 +57,17 @@ internal class HypervisorTest { @Test @Disabled fun smoke() { - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider("test") - val root = system.newDomain("root") + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) - root.launch { - val clock = simulationContext.clock + testScope.launch { val vmm = HypervisorImage val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 1) val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000, 1) - val driverDom = root.newDomain("driver") - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1) val cpus = List(1) { ProcessingUnit(cpuNode, it, 2000.0) } - val metalDriver = SimpleBareMetalDriver(driverDom, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) + val metalDriver = SimpleBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) metalDriver.init() metalDriver.setImage(vmm) @@ -90,10 +85,7 @@ internal class HypervisorTest { vmB.events.onEach { println(it) }.launchIn(this) } - runBlocking { - system.run() - system.terminate() - } + testScope.advanceUntilIdle() } /** @@ -101,16 +93,14 @@ internal class HypervisorTest { */ @Test fun overcommission() { - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider("test") - val root = system.newDomain("root") + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) var requestedBurst = 0L var grantedBurst = 0L var overcommissionedBurst = 0L - root.launch { - val clock = simulationContext.clock + testScope.launch { val vmm = HypervisorImage val duration = 5 * 60L val vmImageA = VmImage( @@ -140,11 +130,9 @@ internal class HypervisorTest { 0 ) - val driverDom = root.newDomain("driver") - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) val cpus = List(2) { ProcessingUnit(cpuNode, it, 3200.0) } - val metalDriver = SimpleBareMetalDriver(driverDom, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) + val metalDriver = SimpleBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) metalDriver.init() metalDriver.setImage(vmm) @@ -170,10 +158,7 @@ internal class HypervisorTest { vmDriver.spawn("b", vmImageB, flavor) } - runBlocking { - system.run() - system.terminate() - } + testScope.advanceUntilIdle() assertAll( { assertEquals(2073600, requestedBurst, "Requested Burst does not match") }, diff --git a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt index 1b896858..e96974f7 100644 --- a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt +++ b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt @@ -24,9 +24,9 @@ package com.atlarge.opendc.core.failure -import com.atlarge.odcsim.simulationContext import kotlinx.coroutines.delay import kotlinx.coroutines.launch +import java.time.Clock import kotlin.math.ln1p import kotlin.math.pow import kotlin.random.Random @@ -35,7 +35,12 @@ import kotlin.random.Random * A [FaultInjector] that injects uncorrelated faults into the system, meaning that failures of the subsystems are * independent. */ -public class UncorrelatedFaultInjector(private val alpha: Double, private val beta: Double, private val random: Random = Random(0)) : FaultInjector { +class UncorrelatedFaultInjector( + private val clock: Clock, + private val alpha: Double, + private val beta: Double, + private val random: Random = Random(0) +) : FaultInjector { /** * Enqueue the specified [FailureDomain] to fail some time in the future. */ @@ -44,7 +49,7 @@ public class UncorrelatedFaultInjector(private val alpha: Double, private val be val d = random.weibull(alpha, beta) * 1e3 // Make sure to convert delay to milliseconds // Handle long overflow - if (simulationContext.clock.millis() + d <= 0) { + if (clock.millis() + d <= 0) { return@launch } diff --git a/simulator/opendc/opendc-experiments-sc18/build.gradle.kts b/simulator/opendc/opendc-experiments-sc18/build.gradle.kts index 2e366a43..5e01f387 100644 --- a/simulator/opendc/opendc-experiments-sc18/build.gradle.kts +++ b/simulator/opendc/opendc-experiments-sc18/build.gradle.kts @@ -38,9 +38,9 @@ dependencies { api(project(":opendc:opendc-core")) implementation(project(":opendc:opendc-format")) implementation(project(":opendc:opendc-workflows")) + implementation(project(":opendc:opendc-simulator")) implementation(kotlin("stdlib")) - runtimeOnly(project(":odcsim:odcsim-engine-omega")) testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}") testImplementation("org.junit.platform:junit-platform-launcher:${Library.JUNIT_PLATFORM}") diff --git a/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt index c7577824..0cece647 100644 --- a/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt +++ b/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt @@ -24,8 +24,6 @@ package com.atlarge.opendc.experiments.sc18 -import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader import com.atlarge.opendc.format.trace.gwf.GwfTraceReader @@ -38,20 +36,19 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy -import kotlinx.coroutines.async +import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.TestCoroutineScope +import org.opendc.simulator.utils.DelayControllerClockAdapter import java.io.File -import java.util.ServiceLoader import kotlin.math.max /** * Main entry point of the experiment. */ +@OptIn(ExperimentalCoroutinesApi::class) fun main(args: Array<String>) { if (args.isEmpty()) { println("error: Please provide path to GWF trace") @@ -62,17 +59,16 @@ fun main(args: Array<String>) { var finished = 0 val token = Channel<Boolean>() - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider(name = "sim") + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) - val schedulerDomain = system.newDomain(name = "scheduler") - val schedulerAsync = schedulerDomain.async { + val schedulerAsync = testScope.async { val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json")) - .use { it.construct(schedulerDomain) } + .use { it.construct(this, clock) } StageWorkflowService( - schedulerDomain, - simulationContext.clock, + this, + clock, environment.platforms[0].zones[0].services[ProvisioningService], mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, @@ -84,9 +80,7 @@ fun main(args: Array<String>) { ) } - val broker = system.newDomain(name = "broker") - - broker.launch { + testScope.launch { val scheduler = schedulerAsync.await() scheduler.events .onEach { event -> @@ -106,21 +100,18 @@ fun main(args: Array<String>) { } .collect() } - broker.launch { - val ctx = simulationContext + + testScope.launch { val reader = GwfTraceReader(File(args[0])) val scheduler = schedulerAsync.await() while (reader.hasNext()) { val (time, job) = reader.next() total += 1 - delay(max(0, time * 1000 - ctx.clock.millis())) + delay(max(0, time * 1000 - clock.millis())) scheduler.submit(job) } } - runBlocking { - system.run() - system.terminate() - } + testScope.advanceUntilIdle() } diff --git a/simulator/opendc/opendc-experiments-sc20/build.gradle.kts b/simulator/opendc/opendc-experiments-sc20/build.gradle.kts index a66612e4..713354e7 100644 --- a/simulator/opendc/opendc-experiments-sc20/build.gradle.kts +++ b/simulator/opendc/opendc-experiments-sc20/build.gradle.kts @@ -38,6 +38,7 @@ application { dependencies { api(project(":opendc:opendc-core")) implementation(project(":opendc:opendc-format")) + implementation(project(":opendc:opendc-simulator")) implementation("com.github.ajalt:clikt:2.6.0") implementation("me.tongfei:progressbar:0.8.1") diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt index 3765f307..b68ee97e 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt @@ -24,9 +24,6 @@ package com.atlarge.opendc.experiments.sc20.experiment -import com.atlarge.odcsim.Domain -import com.atlarge.odcsim.SimulationContext -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel @@ -46,17 +43,15 @@ import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.trace.TraceReader -import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.takeWhile -import kotlinx.coroutines.launch -import kotlinx.coroutines.withContext import mu.KotlinLogging import java.io.File +import java.time.Clock import kotlin.math.ln import kotlin.math.max import kotlin.random.Random @@ -70,14 +65,14 @@ private val logger = KotlinLogging.logger {} * Construct the failure domain for the experiments. */ suspend fun createFailureDomain( + coroutineScope: CoroutineScope, + clock: Clock, seed: Int, failureInterval: Double, bareMetalProvisioner: ProvisioningService, chan: Channel<Unit> -): Domain { - val root = simulationContext.domain - val domain = root.newDomain(name = "failures") - domain.launch { +): CoroutineScope { + val job = coroutineScope.launch { chan.receive() val random = Random(seed) val injectors = mutableMapOf<String, FaultInjector>() @@ -86,7 +81,8 @@ suspend fun createFailureDomain( val injector = injectors.getOrPut(cluster) { createFaultInjector( - simulationContext, + this, + clock, random, failureInterval ) @@ -94,18 +90,18 @@ suspend fun createFailureDomain( injector.enqueue(node.metadata["driver"] as FailureDomain) } } - return domain + return CoroutineScope(coroutineScope.coroutineContext + job) } /** * Obtain the [FaultInjector] to use for the experiments. */ -fun createFaultInjector(simulationContext: SimulationContext, random: Random, failureInterval: Double): FaultInjector { +fun createFaultInjector(coroutineScope: CoroutineScope, clock: Clock, random: Random, failureInterval: Double): FaultInjector { // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 // GRID'5000 return CorrelatedFaultInjector( - simulationContext.domain, - simulationContext.clock, + coroutineScope, + clock, iatScale = ln(failureInterval), iatShape = 1.03, // Hours sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1 dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes @@ -129,31 +125,30 @@ fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInter * Construct the environment for a VM provisioner and return the provisioner instance. */ suspend fun createProvisioner( - root: Domain, + coroutineScope: CoroutineScope, + clock: Clock, environmentReader: EnvironmentReader, allocationPolicy: AllocationPolicy -): Pair<ProvisioningService, SimpleVirtProvisioningService> = withContext(root.coroutineContext) { - val environment = environmentReader.use { it.construct(root) } +): Pair<ProvisioningService, SimpleVirtProvisioningService> { + val environment = environmentReader.use { it.construct(coroutineScope, clock) } val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService] // Wait for the bare metal nodes to be spawned delay(10) - val scheduler = SimpleVirtProvisioningService(simulationContext.domain, simulationContext.clock, bareMetalProvisioner, allocationPolicy) + val scheduler = SimpleVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy) // Wait for the hypervisors to be spawned delay(10) - bareMetalProvisioner to scheduler + return bareMetalProvisioner to scheduler } /** * Attach the specified monitor to the VM provisioner. */ @OptIn(ExperimentalCoroutinesApi::class) -suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: ExperimentMonitor) { - val domain = simulationContext.domain - val clock = simulationContext.clock +suspend fun attachMonitor(coroutineScope: CoroutineScope, clock: Clock, scheduler: SimpleVirtProvisioningService, monitor: ExperimentMonitor) { val hypervisors = scheduler.drivers() // Monitor hypervisor events @@ -169,7 +164,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp } } } - .launchIn(domain) + .launchIn(coroutineScope) hypervisor.events .onEach { event -> when (event) { @@ -186,12 +181,12 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp ) } } - .launchIn(domain) + .launchIn(coroutineScope) val driver = hypervisor.server.services[BareMetalDriver.Key] driver.powerDraw .onEach { monitor.reportPowerConsumption(hypervisor.server, it) } - .launchIn(domain) + .launchIn(coroutineScope) } scheduler.events @@ -201,15 +196,13 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp monitor.reportProvisionerMetrics(clock.millis(), event) } } - .launchIn(domain) + .launchIn(coroutineScope) } /** * Process the trace. */ -suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: ExperimentMonitor, vmPlacements: Map<String, String> = emptyMap()) { - val domain = simulationContext.domain - +suspend fun processTrace(coroutineScope: CoroutineScope, clock: Clock, reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: ExperimentMonitor, vmPlacements: Map<String, String> = emptyMap()) { try { var submitted = 0 @@ -217,8 +210,8 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP val (time, workload) = reader.next() submitted++ - delay(max(0, time - simulationContext.clock.millis())) - domain.launch { + delay(max(0, time - clock.millis())) + coroutineScope.launch { chan.send(Unit) val server = scheduler.deploy( workload.image.name, @@ -229,7 +222,7 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP server.events .onEach { if (it is ServerEvent.StateChanged) { - monitor.reportVmStateChange(simulationContext.clock.millis(), it.server) + monitor.reportVmStateChange(clock.millis(), it.server) } } .collect() diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt index 7b42b095..76a10e56 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt @@ -24,7 +24,6 @@ package com.atlarge.opendc.experiments.sc20.experiment -import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy @@ -38,12 +37,14 @@ import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionC import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch +import kotlinx.coroutines.test.TestCoroutineScope import mu.KotlinLogging +import org.opendc.simulator.utils.DelayControllerClockAdapter import java.io.File -import java.util.ServiceLoader import kotlin.random.Random /** @@ -52,19 +53,15 @@ import kotlin.random.Random private val logger = KotlinLogging.logger {} /** - * The provider for the simulation engine to use. - */ -private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - -/** * An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the * same set of parameters. */ +@OptIn(ExperimentalCoroutinesApi::class) public data class Run(override val parent: Scenario, val id: Int, val seed: Int) : TrialExperimentDescriptor() { override suspend fun invoke(context: ExperimentExecutionContext) { val experiment = parent.parent.parent - val system = provider("experiment-$id") - val root = system.newDomain("root") + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) val seeder = Random(seed) val environment = Sc20ClusterEnvironmentReader(File(experiment.environments, "${parent.topology.name}.txt")) @@ -112,9 +109,10 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int) parent.parent.parent.bufferSize ) - root.launch { + testScope.launch { val (bareMetalProvisioner, scheduler) = createProvisioner( - root, + this, + clock, environment, allocationPolicy ) @@ -122,6 +120,8 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int) val failureDomain = if (parent.operationalPhenomena.failureFrequency > 0) { logger.debug("ENABLING failures") createFailureDomain( + this, + clock, seeder.nextInt(), parent.operationalPhenomena.failureFrequency, bareMetalProvisioner, @@ -131,8 +131,10 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int) null } - attachMonitor(scheduler, monitor) + attachMonitor(this, clock, scheduler, monitor) processTrace( + this, + clock, trace, scheduler, chan, @@ -151,9 +153,8 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int) } try { - system.run() + testScope.advanceUntilIdle() } finally { - system.terminate() monitor.close() } } diff --git a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt index 5ecf7605..ebee1543 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -24,9 +24,6 @@ package com.atlarge.opendc.experiments.sc20 -import com.atlarge.odcsim.Domain -import com.atlarge.odcsim.SimulationEngine -import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService @@ -42,32 +39,35 @@ import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.TraceReader +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.TestCoroutineScope import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertAll import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll +import org.opendc.simulator.utils.DelayControllerClockAdapter import java.io.File -import java.util.ServiceLoader +import java.time.Clock /** * An integration test suite for the SC20 experiments. */ +@OptIn(ExperimentalCoroutinesApi::class) class Sc20IntegrationTest { /** - * The simulation engine to use. + * The [TestCoroutineScope] to use. */ - private lateinit var simulationEngine: SimulationEngine + private lateinit var testScope: TestCoroutineScope /** - * The root simulation domain to run in. + * The simulation clock to use. */ - private lateinit var root: Domain + private lateinit var clock: Clock /** * The monitor used to keep track of the metrics. @@ -79,9 +79,9 @@ class Sc20IntegrationTest { */ @BeforeEach fun setUp() { - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - simulationEngine = provider("test") - root = simulationEngine.newDomain("root") + testScope = TestCoroutineScope() + clock = DelayControllerClockAdapter(testScope) + monitor = TestExperimentReporter() } @@ -89,9 +89,7 @@ class Sc20IntegrationTest { * Tear down the experimental environment. */ @AfterEach - fun tearDown() = runBlocking { - simulationEngine.terminate() - } + fun tearDown() = testScope.cleanupTestCoroutines() @Test fun smoke() { @@ -103,9 +101,10 @@ class Sc20IntegrationTest { val environmentReader = createTestEnvironmentReader() lateinit var scheduler: SimpleVirtProvisioningService - root.launch { + testScope.launch { val res = createProvisioner( - root, + this, + clock, environmentReader, allocationPolicy ) @@ -115,6 +114,8 @@ class Sc20IntegrationTest { val failureDomain = if (failures) { println("ENABLING failures") createFailureDomain( + this, + clock, seed, 24.0 * 7, bareMetalProvisioner, @@ -124,8 +125,10 @@ class Sc20IntegrationTest { null } - attachMonitor(scheduler, monitor) + attachMonitor(this, clock, scheduler, monitor) processTrace( + this, + clock, traceReader, scheduler, chan, @@ -159,16 +162,19 @@ class Sc20IntegrationTest { val environmentReader = createTestEnvironmentReader("single") lateinit var scheduler: SimpleVirtProvisioningService - root.launch { + testScope.launch { val res = createProvisioner( - root, + this, + clock, environmentReader, allocationPolicy ) scheduler = res.second - attachMonitor(scheduler, monitor) + attachMonitor(this, clock, scheduler, monitor) processTrace( + this, + clock, traceReader, scheduler, chan, @@ -195,9 +201,7 @@ class Sc20IntegrationTest { /** * Run the simulation. */ - private fun runSimulation() = runBlocking { - simulationEngine.run() - } + private fun runSimulation() = testScope.advanceUntilIdle() /** * Obtain the trace reader for the test. diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt index 4c4dcf37..570b936d 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt @@ -27,6 +27,7 @@ package com.atlarge.opendc.format.environment import com.atlarge.opendc.core.Environment import kotlinx.coroutines.CoroutineScope import java.io.Closeable +import java.time.Clock /** * An interface for reading descriptions of topology environments into memory as [Environment]. @@ -35,5 +36,5 @@ interface EnvironmentReader : Closeable { /** * Construct an [Environment] in the specified [CoroutineScope]. */ - suspend fun construct(coroutineScope: CoroutineScope): Environment + suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment } diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt index 2b608aef..188d9fd8 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -24,7 +24,6 @@ package com.atlarge.opendc.format.environment.sc18 -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit @@ -41,6 +40,7 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import kotlinx.coroutines.CoroutineScope import java.io.InputStream +import java.time.Clock import java.util.UUID /** @@ -56,9 +56,7 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb */ private val setup: Setup = mapper.readValue(input) - override suspend fun construct(coroutineScope: CoroutineScope): Environment { - val clock = simulationContext.clock - + override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { var counter = 0 val nodes = setup.rooms.flatMap { room -> room.objects.flatMap { roomObject -> diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt index 49118675..d7845081 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -24,7 +24,6 @@ package com.atlarge.opendc.format.environment.sc20 -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit @@ -42,6 +41,7 @@ import kotlinx.coroutines.CoroutineScope import java.io.File import java.io.FileInputStream import java.io.InputStream +import java.time.Clock import java.util.Random import java.util.UUID @@ -57,9 +57,7 @@ class Sc20ClusterEnvironmentReader( constructor(file: File) : this(FileInputStream(file)) @Suppress("BlockingMethodInNonBlockingContext") - override suspend fun construct(coroutineScope: CoroutineScope): Environment { - val clock = simulationContext.clock - + override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { var clusterIdCol = 0 var speedCol = 0 var numberOfHostsCol = 0 diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt index f22f595f..adfa1cf0 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt @@ -24,7 +24,6 @@ package com.atlarge.opendc.format.environment.sc20 -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit @@ -42,6 +41,7 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import kotlinx.coroutines.CoroutineScope import java.io.InputStream +import java.time.Clock import java.util.UUID /** @@ -56,8 +56,7 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb */ private val setup: Setup = mapper.readValue(input) - override suspend fun construct(coroutineScope: CoroutineScope): Environment { - val clock = simulationContext.clock + override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { var counter = 0 val nodes = setup.rooms.flatMap { room -> room.objects.flatMap { roomObject -> diff --git a/simulator/opendc/opendc-runner-web/build.gradle.kts b/simulator/opendc/opendc-runner-web/build.gradle.kts index 6f725de1..1d263e75 100644 --- a/simulator/opendc/opendc-runner-web/build.gradle.kts +++ b/simulator/opendc/opendc-runner-web/build.gradle.kts @@ -39,6 +39,7 @@ dependencies { implementation(project(":opendc:opendc-compute")) implementation(project(":opendc:opendc-format")) implementation(project(":opendc:opendc-experiments-sc20")) + implementation(project(":opendc:opendc-simulator")) implementation("com.github.ajalt:clikt:2.8.0") implementation("io.github.microutils:kotlin-logging:1.7.10") diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt index 9cfe5531..ac4d9087 100644 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt @@ -1,6 +1,5 @@ package com.atlarge.opendc.runner.web -import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.opendc.compute.virt.service.allocation.* import com.atlarge.opendc.experiments.sc20.experiment.attachMonitor import com.atlarge.opendc.experiments.sc20.experiment.createFailureDomain @@ -24,8 +23,10 @@ import com.mongodb.client.MongoDatabase import com.mongodb.client.model.Filters import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.test.TestCoroutineScope import mu.KotlinLogging import org.bson.Document +import org.opendc.simulator.utils.DelayControllerClockAdapter import java.io.File import java.util.* import kotlin.random.Random @@ -33,13 +34,9 @@ import kotlin.random.Random private val logger = KotlinLogging.logger {} /** - * The provider for the simulation engine to use. - */ -private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - -/** * Represents the CLI command for starting the OpenDC web runner. */ +@OptIn(ExperimentalCoroutinesApi::class) class RunnerCli : CliktCommand(name = "runner") { /** * The name of the database to use. @@ -195,8 +192,8 @@ class RunnerCli : CliktCommand(name = "runner") { val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble() val seeder = Random(seed) - val system = provider("experiment-$id") - val root = system.newDomain("root") + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) val chan = Channel<Unit>(Channel.CONFLATED) @@ -230,9 +227,10 @@ class RunnerCli : CliktCommand(name = "runner") { 4096 ) - root.launch { + testScope.launch { val (bareMetalProvisioner, scheduler) = createProvisioner( - root, + this, + clock, environment, allocationPolicy ) @@ -240,6 +238,8 @@ class RunnerCli : CliktCommand(name = "runner") { val failureDomain = if (operational.getBoolean("failuresEnabled")) { logger.debug("ENABLING failures") createFailureDomain( + testScope, + clock, seeder.nextInt(), operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7, bareMetalProvisioner, @@ -249,8 +249,10 @@ class RunnerCli : CliktCommand(name = "runner") { null } - attachMonitor(scheduler, monitor) + attachMonitor(this, clock, scheduler, monitor) processTrace( + this, + clock, trace, scheduler, chan, @@ -269,9 +271,8 @@ class RunnerCli : CliktCommand(name = "runner") { } try { - system.run() + testScope.advanceUntilIdle() } finally { - system.terminate() monitor.close() } } diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt index ab683985..f9b1c6c4 100644 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt @@ -1,6 +1,5 @@ package com.atlarge.opendc.runner.web -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit @@ -23,6 +22,7 @@ import com.mongodb.client.model.Projections import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.launch import org.bson.Document +import java.time.Clock import java.util.* /** @@ -32,8 +32,7 @@ class TopologyParser(private val collection: MongoCollection<Document>, private /** * Parse the topology with the specified [id]. */ - override suspend fun construct(coroutineScope: CoroutineScope): Environment { - val clock = simulationContext.clock + override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { val nodes = mutableListOf<SimpleBareMetalDriver>() val random = Random(0) diff --git a/simulator/opendc/opendc-workflows/build.gradle.kts b/simulator/opendc/opendc-workflows/build.gradle.kts index 893c9020..62c4bc25 100644 --- a/simulator/opendc/opendc-workflows/build.gradle.kts +++ b/simulator/opendc/opendc-workflows/build.gradle.kts @@ -33,7 +33,7 @@ dependencies { api(project(":opendc:opendc-core")) api(project(":opendc:opendc-compute")) - testRuntimeOnly(project(":odcsim:odcsim-engine-omega")) + testImplementation(project(":opendc:opendc-simulator")) testImplementation(project(":opendc:opendc-format")) testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}") diff --git a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt index 655d8e1d..114003a3 100644 --- a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt @@ -24,8 +24,6 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader import com.atlarge.opendc.format.trace.gwf.GwfTraceReader @@ -35,22 +33,22 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy -import kotlinx.coroutines.async -import kotlinx.coroutines.delay +import kotlinx.coroutines.* import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.TestCoroutineScope import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test -import java.util.ServiceLoader +import org.opendc.simulator.utils.DelayControllerClockAdapter import kotlin.math.max /** * Integration test suite for the [StageWorkflowService]. */ @DisplayName("StageWorkflowService") +@OptIn(ExperimentalCoroutinesApi::class) internal class StageWorkflowSchedulerIntegrationTest { /** * A large integration test where we check whether all tasks in some trace are executed correctly. @@ -63,17 +61,15 @@ internal class StageWorkflowSchedulerIntegrationTest { var tasksStarted = 0L var tasksFinished = 0L - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider(name = "sim") + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) - val schedulerDomain = system.newDomain(name = "scheduler") - val schedulerAsync = schedulerDomain.async { - val clock = simulationContext.clock + val schedulerAsync = testScope.async { val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) - .use { it.construct(schedulerDomain) } + .use { it.construct(testScope, clock) } StageWorkflowService( - schedulerDomain, + testScope, clock, environment.platforms[0].zones[0].services[ProvisioningService], mode = WorkflowSchedulerMode.Batch(100), @@ -86,9 +82,7 @@ internal class StageWorkflowSchedulerIntegrationTest { ) } - val broker = system.newDomain(name = "broker") - - broker.launch { + testScope.launch { val scheduler = schedulerAsync.await() scheduler.events .onEach { event -> @@ -102,24 +96,21 @@ internal class StageWorkflowSchedulerIntegrationTest { .collect() } - broker.launch { - val ctx = simulationContext + testScope.launch { val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf")) val scheduler = schedulerAsync.await() while (reader.hasNext()) { val (time, job) = reader.next() jobsSubmitted++ - delay(max(0, time * 1000 - ctx.clock.millis())) + delay(max(0, time * 1000 - clock.millis())) scheduler.submit(job) } } - runBlocking { - system.run() - system.terminate() - } + testScope.advanceUntilIdle() + assertNotEquals(0, jobsSubmitted, "No jobs submitted") assertEquals(jobsSubmitted, jobsStarted, "Not all submitted jobs started") assertEquals(jobsSubmitted, jobsFinished, "Not all started jobs finished") assertEquals(tasksStarted, tasksFinished, "Not all started tasks finished") |
