diff options
22 files changed, 177 insertions, 226 deletions
diff --git a/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt b/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt index 5d9af9ec..4918a535 100644 --- a/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt +++ b/simulator/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/flow/EventFlow.kt @@ -32,7 +32,6 @@ import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.FlowCollector import kotlinx.coroutines.flow.consumeAsFlow -import java.util.WeakHashMap /** * A [Flow] that can be used to emit events. @@ -61,7 +60,7 @@ public fun <T> EventFlow(): EventFlow<T> = EventFlowImpl() @OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) private class EventFlowImpl<T> : EventFlow<T> { private var closed: Boolean = false - private val subscribers = WeakHashMap<SendChannel<T>, Unit>() + private val subscribers = HashMap<SendChannel<T>, Unit>() override fun emit(event: T) { synchronized(this) { diff --git a/simulator/opendc/opendc-compute/build.gradle.kts b/simulator/opendc/opendc-compute/build.gradle.kts index 376c4269..0e44785e 100644 --- a/simulator/opendc/opendc-compute/build.gradle.kts +++ b/simulator/opendc/opendc-compute/build.gradle.kts @@ -34,7 +34,7 @@ dependencies { api(project(":opendc:opendc-core")) implementation("io.github.microutils:kotlin-logging:1.7.9") - testRuntimeOnly(project(":odcsim:odcsim-engine-omega")) + testImplementation(project(":opendc:opendc-simulator")) testRuntimeOnly("org.slf4j:slf4j-simple:${Library.SLF4J}") testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}") diff --git a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index df45f440..bd266208 100644 --- a/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/simulator/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -39,18 +39,10 @@ import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.core.services.ServiceKey import com.atlarge.opendc.core.services.ServiceRegistry -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.DisposableHandle -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.FlowPreview -import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.Job +import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.intrinsics.startCoroutineCancellable -import kotlinx.coroutines.launch import kotlinx.coroutines.selects.SelectClause0 import kotlinx.coroutines.selects.SelectInstance import kotlinx.coroutines.selects.select @@ -100,6 +92,8 @@ class SimpleVirtDriver( init { launch { try { + // Yield first to allow class variables to initialize + yield() scheduler() } catch (e: Exception) { if (e !is CancellationException) { diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt index 80c9c547..7b57327e 100644 --- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt +++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt @@ -24,53 +24,52 @@ package com.atlarge.opendc.compute.metal.driver -import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.image.FlopsApplicationImage +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.TestCoroutineScope import kotlinx.coroutines.withContext import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test -import java.util.ServiceLoader +import org.opendc.simulator.utils.DelayControllerClockAdapter import java.util.UUID +@OptIn(ExperimentalCoroutinesApi::class) internal class SimpleBareMetalDriverTest { /** * A smoke test for the bare-metal driver. */ @Test fun smoke() { + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) + var finalState: ServerState = ServerState.BUILD - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider("sim") - val root = system.newDomain(name = "root") - root.launch { - val dom = root.newDomain(name = "driver") + testScope.launch { val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) } - val driver = SimpleBareMetalDriver(dom, simulationContext.clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) + val driver = SimpleBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 2) // Batch driver commands - withContext(dom.coroutineContext) { + withContext(coroutineContext) { driver.init() driver.setImage(image) val server = driver.start().server!! driver.usage - .onEach { println("${simulationContext.clock.millis()} $it") } + .onEach { println("${clock.millis()} $it") } .launchIn(this) server.events.collect { event -> when (event) { is ServerEvent.StateChanged -> { - println("${simulationContext.clock.millis()} $event") + println("${clock.millis()} $event") finalState = event.server.state } } @@ -78,11 +77,7 @@ internal class SimpleBareMetalDriverTest { } } - runBlocking { - system.run() - system.terminate() - } - + testScope.advanceUntilIdle() assertEquals(ServerState.SHUTOFF, finalState) } } diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt index 37cd5898..0a85e0f9 100644 --- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt +++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/service/SimpleProvisioningServiceTest.kt @@ -24,40 +24,38 @@ package com.atlarge.opendc.compute.metal.service -import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit import com.atlarge.opendc.compute.core.image.FlopsApplicationImage import com.atlarge.opendc.compute.metal.driver.SimpleBareMetalDriver +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.TestCoroutineScope import org.junit.jupiter.api.Test -import java.util.ServiceLoader +import org.opendc.simulator.utils.DelayControllerClockAdapter import java.util.UUID /** * Test suite for the [SimpleProvisioningService]. */ +@OptIn(ExperimentalCoroutinesApi::class) internal class SimpleProvisioningServiceTest { /** * A basic smoke test. */ @Test fun smoke() { - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider("sim") - val root = system.newDomain(name = "root") - root.launch { - val clock = simulationContext.clock + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) + + testScope.launch { val image = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1000, 2) - val dom = root.newDomain("provisioner") val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 4) val cpus = List(4) { ProcessingUnit(cpuNode, it, 2400.0) } - val driver = SimpleBareMetalDriver(dom.newDomain(), clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) + val driver = SimpleBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) val provisioner = SimpleProvisioningService() provisioner.create(driver) @@ -67,9 +65,6 @@ internal class SimpleProvisioningServiceTest { node.server!!.events.collect { println(it) } } - runBlocking { - system.run() - system.terminate() - } + testScope.advanceUntilIdle() } } diff --git a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt index 528434b1..dca0b292 100644 --- a/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt +++ b/simulator/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/virt/HypervisorTest.kt @@ -24,8 +24,6 @@ package com.atlarge.opendc.compute.virt -import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit @@ -39,17 +37,18 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.TestCoroutineScope import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import java.util.ServiceLoader +import org.opendc.simulator.utils.DelayControllerClockAdapter import java.util.UUID /** * Basic test-suite for the hypervisor. */ +@OptIn(ExperimentalCoroutinesApi::class) internal class HypervisorTest { /** * A smoke test for the bare-metal driver. @@ -58,21 +57,17 @@ internal class HypervisorTest { @Test @Disabled fun smoke() { - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider("test") - val root = system.newDomain("root") + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) - root.launch { - val clock = simulationContext.clock + testScope.launch { val vmm = HypervisorImage val workloadA = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 1_000, 1) val workloadB = FlopsApplicationImage(UUID.randomUUID(), "<unnamed>", emptyMap(), 2_000, 1) - val driverDom = root.newDomain("driver") - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 1) val cpus = List(1) { ProcessingUnit(cpuNode, it, 2000.0) } - val metalDriver = SimpleBareMetalDriver(driverDom, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) + val metalDriver = SimpleBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) metalDriver.init() metalDriver.setImage(vmm) @@ -90,10 +85,7 @@ internal class HypervisorTest { vmB.events.onEach { println(it) }.launchIn(this) } - runBlocking { - system.run() - system.terminate() - } + testScope.advanceUntilIdle() } /** @@ -101,16 +93,14 @@ internal class HypervisorTest { */ @Test fun overcommission() { - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider("test") - val root = system.newDomain("root") + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) var requestedBurst = 0L var grantedBurst = 0L var overcommissionedBurst = 0L - root.launch { - val clock = simulationContext.clock + testScope.launch { val vmm = HypervisorImage val duration = 5 * 60L val vmImageA = VmImage( @@ -140,11 +130,9 @@ internal class HypervisorTest { 0 ) - val driverDom = root.newDomain("driver") - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) val cpus = List(2) { ProcessingUnit(cpuNode, it, 3200.0) } - val metalDriver = SimpleBareMetalDriver(driverDom, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) + val metalDriver = SimpleBareMetalDriver(this, clock, UUID.randomUUID(), "test", emptyMap(), cpus, emptyList()) metalDriver.init() metalDriver.setImage(vmm) @@ -170,10 +158,7 @@ internal class HypervisorTest { vmDriver.spawn("b", vmImageB, flavor) } - runBlocking { - system.run() - system.terminate() - } + testScope.advanceUntilIdle() assertAll( { assertEquals(2073600, requestedBurst, "Requested Burst does not match") }, diff --git a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt index 1b896858..e96974f7 100644 --- a/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt +++ b/simulator/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt @@ -24,9 +24,9 @@ package com.atlarge.opendc.core.failure -import com.atlarge.odcsim.simulationContext import kotlinx.coroutines.delay import kotlinx.coroutines.launch +import java.time.Clock import kotlin.math.ln1p import kotlin.math.pow import kotlin.random.Random @@ -35,7 +35,12 @@ import kotlin.random.Random * A [FaultInjector] that injects uncorrelated faults into the system, meaning that failures of the subsystems are * independent. */ -public class UncorrelatedFaultInjector(private val alpha: Double, private val beta: Double, private val random: Random = Random(0)) : FaultInjector { +class UncorrelatedFaultInjector( + private val clock: Clock, + private val alpha: Double, + private val beta: Double, + private val random: Random = Random(0) +) : FaultInjector { /** * Enqueue the specified [FailureDomain] to fail some time in the future. */ @@ -44,7 +49,7 @@ public class UncorrelatedFaultInjector(private val alpha: Double, private val be val d = random.weibull(alpha, beta) * 1e3 // Make sure to convert delay to milliseconds // Handle long overflow - if (simulationContext.clock.millis() + d <= 0) { + if (clock.millis() + d <= 0) { return@launch } diff --git a/simulator/opendc/opendc-experiments-sc18/build.gradle.kts b/simulator/opendc/opendc-experiments-sc18/build.gradle.kts index 2e366a43..5e01f387 100644 --- a/simulator/opendc/opendc-experiments-sc18/build.gradle.kts +++ b/simulator/opendc/opendc-experiments-sc18/build.gradle.kts @@ -38,9 +38,9 @@ dependencies { api(project(":opendc:opendc-core")) implementation(project(":opendc:opendc-format")) implementation(project(":opendc:opendc-workflows")) + implementation(project(":opendc:opendc-simulator")) implementation(kotlin("stdlib")) - runtimeOnly(project(":odcsim:odcsim-engine-omega")) testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}") testImplementation("org.junit.platform:junit-platform-launcher:${Library.JUNIT_PLATFORM}") diff --git a/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt b/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt index c7577824..0cece647 100644 --- a/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt +++ b/simulator/opendc/opendc-experiments-sc18/src/main/kotlin/com/atlarge/opendc/experiments/sc18/TestExperiment.kt @@ -24,8 +24,6 @@ package com.atlarge.opendc.experiments.sc18 -import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader import com.atlarge.opendc.format.trace.gwf.GwfTraceReader @@ -38,20 +36,19 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy -import kotlinx.coroutines.async +import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.TestCoroutineScope +import org.opendc.simulator.utils.DelayControllerClockAdapter import java.io.File -import java.util.ServiceLoader import kotlin.math.max /** * Main entry point of the experiment. */ +@OptIn(ExperimentalCoroutinesApi::class) fun main(args: Array<String>) { if (args.isEmpty()) { println("error: Please provide path to GWF trace") @@ -62,17 +59,16 @@ fun main(args: Array<String>) { var finished = 0 val token = Channel<Boolean>() - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider(name = "sim") + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) - val schedulerDomain = system.newDomain(name = "scheduler") - val schedulerAsync = schedulerDomain.async { + val schedulerAsync = testScope.async { val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/env/setup-test.json")) - .use { it.construct(schedulerDomain) } + .use { it.construct(this, clock) } StageWorkflowService( - schedulerDomain, - simulationContext.clock, + this, + clock, environment.platforms[0].zones[0].services[ProvisioningService], mode = WorkflowSchedulerMode.Batch(100), jobAdmissionPolicy = NullJobAdmissionPolicy, @@ -84,9 +80,7 @@ fun main(args: Array<String>) { ) } - val broker = system.newDomain(name = "broker") - - broker.launch { + testScope.launch { val scheduler = schedulerAsync.await() scheduler.events .onEach { event -> @@ -106,21 +100,18 @@ fun main(args: Array<String>) { } .collect() } - broker.launch { - val ctx = simulationContext + + testScope.launch { val reader = GwfTraceReader(File(args[0])) val scheduler = schedulerAsync.await() while (reader.hasNext()) { val (time, job) = reader.next() total += 1 - delay(max(0, time * 1000 - ctx.clock.millis())) + delay(max(0, time * 1000 - clock.millis())) scheduler.submit(job) } } - runBlocking { - system.run() - system.terminate() - } + testScope.advanceUntilIdle() } diff --git a/simulator/opendc/opendc-experiments-sc20/build.gradle.kts b/simulator/opendc/opendc-experiments-sc20/build.gradle.kts index a66612e4..713354e7 100644 --- a/simulator/opendc/opendc-experiments-sc20/build.gradle.kts +++ b/simulator/opendc/opendc-experiments-sc20/build.gradle.kts @@ -38,6 +38,7 @@ application { dependencies { api(project(":opendc:opendc-core")) implementation(project(":opendc:opendc-format")) + implementation(project(":opendc:opendc-simulator")) implementation("com.github.ajalt:clikt:2.6.0") implementation("me.tongfei:progressbar:0.8.1") diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt index 3765f307..b68ee97e 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt @@ -24,9 +24,6 @@ package com.atlarge.opendc.experiments.sc20.experiment -import com.atlarge.odcsim.Domain -import com.atlarge.odcsim.SimulationContext -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel @@ -46,17 +43,15 @@ import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.trace.TraceReader -import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.takeWhile -import kotlinx.coroutines.launch -import kotlinx.coroutines.withContext import mu.KotlinLogging import java.io.File +import java.time.Clock import kotlin.math.ln import kotlin.math.max import kotlin.random.Random @@ -70,14 +65,14 @@ private val logger = KotlinLogging.logger {} * Construct the failure domain for the experiments. */ suspend fun createFailureDomain( + coroutineScope: CoroutineScope, + clock: Clock, seed: Int, failureInterval: Double, bareMetalProvisioner: ProvisioningService, chan: Channel<Unit> -): Domain { - val root = simulationContext.domain - val domain = root.newDomain(name = "failures") - domain.launch { +): CoroutineScope { + val job = coroutineScope.launch { chan.receive() val random = Random(seed) val injectors = mutableMapOf<String, FaultInjector>() @@ -86,7 +81,8 @@ suspend fun createFailureDomain( val injector = injectors.getOrPut(cluster) { createFaultInjector( - simulationContext, + this, + clock, random, failureInterval ) @@ -94,18 +90,18 @@ suspend fun createFailureDomain( injector.enqueue(node.metadata["driver"] as FailureDomain) } } - return domain + return CoroutineScope(coroutineScope.coroutineContext + job) } /** * Obtain the [FaultInjector] to use for the experiments. */ -fun createFaultInjector(simulationContext: SimulationContext, random: Random, failureInterval: Double): FaultInjector { +fun createFaultInjector(coroutineScope: CoroutineScope, clock: Clock, random: Random, failureInterval: Double): FaultInjector { // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 // GRID'5000 return CorrelatedFaultInjector( - simulationContext.domain, - simulationContext.clock, + coroutineScope, + clock, iatScale = ln(failureInterval), iatShape = 1.03, // Hours sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1 dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes @@ -129,31 +125,30 @@ fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInter * Construct the environment for a VM provisioner and return the provisioner instance. */ suspend fun createProvisioner( - root: Domain, + coroutineScope: CoroutineScope, + clock: Clock, environmentReader: EnvironmentReader, allocationPolicy: AllocationPolicy -): Pair<ProvisioningService, SimpleVirtProvisioningService> = withContext(root.coroutineContext) { - val environment = environmentReader.use { it.construct(root) } +): Pair<ProvisioningService, SimpleVirtProvisioningService> { + val environment = environmentReader.use { it.construct(coroutineScope, clock) } val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService] // Wait for the bare metal nodes to be spawned delay(10) - val scheduler = SimpleVirtProvisioningService(simulationContext.domain, simulationContext.clock, bareMetalProvisioner, allocationPolicy) + val scheduler = SimpleVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy) // Wait for the hypervisors to be spawned delay(10) - bareMetalProvisioner to scheduler + return bareMetalProvisioner to scheduler } /** * Attach the specified monitor to the VM provisioner. */ @OptIn(ExperimentalCoroutinesApi::class) -suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: ExperimentMonitor) { - val domain = simulationContext.domain - val clock = simulationContext.clock +suspend fun attachMonitor(coroutineScope: CoroutineScope, clock: Clock, scheduler: SimpleVirtProvisioningService, monitor: ExperimentMonitor) { val hypervisors = scheduler.drivers() // Monitor hypervisor events @@ -169,7 +164,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp } } } - .launchIn(domain) + .launchIn(coroutineScope) hypervisor.events .onEach { event -> when (event) { @@ -186,12 +181,12 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp ) } } - .launchIn(domain) + .launchIn(coroutineScope) val driver = hypervisor.server.services[BareMetalDriver.Key] driver.powerDraw .onEach { monitor.reportPowerConsumption(hypervisor.server, it) } - .launchIn(domain) + .launchIn(coroutineScope) } scheduler.events @@ -201,15 +196,13 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp monitor.reportProvisionerMetrics(clock.millis(), event) } } - .launchIn(domain) + .launchIn(coroutineScope) } /** * Process the trace. */ -suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: ExperimentMonitor, vmPlacements: Map<String, String> = emptyMap()) { - val domain = simulationContext.domain - +suspend fun processTrace(coroutineScope: CoroutineScope, clock: Clock, reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: ExperimentMonitor, vmPlacements: Map<String, String> = emptyMap()) { try { var submitted = 0 @@ -217,8 +210,8 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP val (time, workload) = reader.next() submitted++ - delay(max(0, time - simulationContext.clock.millis())) - domain.launch { + delay(max(0, time - clock.millis())) + coroutineScope.launch { chan.send(Unit) val server = scheduler.deploy( workload.image.name, @@ -229,7 +222,7 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP server.events .onEach { if (it is ServerEvent.StateChanged) { - monitor.reportVmStateChange(simulationContext.clock.millis(), it.server) + monitor.reportVmStateChange(clock.millis(), it.server) } } .collect() diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt index 7b42b095..76a10e56 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt @@ -24,7 +24,6 @@ package com.atlarge.opendc.experiments.sc20.experiment -import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy @@ -38,12 +37,14 @@ import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionC import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch +import kotlinx.coroutines.test.TestCoroutineScope import mu.KotlinLogging +import org.opendc.simulator.utils.DelayControllerClockAdapter import java.io.File -import java.util.ServiceLoader import kotlin.random.Random /** @@ -52,19 +53,15 @@ import kotlin.random.Random private val logger = KotlinLogging.logger {} /** - * The provider for the simulation engine to use. - */ -private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - -/** * An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the * same set of parameters. */ +@OptIn(ExperimentalCoroutinesApi::class) public data class Run(override val parent: Scenario, val id: Int, val seed: Int) : TrialExperimentDescriptor() { override suspend fun invoke(context: ExperimentExecutionContext) { val experiment = parent.parent.parent - val system = provider("experiment-$id") - val root = system.newDomain("root") + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) val seeder = Random(seed) val environment = Sc20ClusterEnvironmentReader(File(experiment.environments, "${parent.topology.name}.txt")) @@ -112,9 +109,10 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int) parent.parent.parent.bufferSize ) - root.launch { + testScope.launch { val (bareMetalProvisioner, scheduler) = createProvisioner( - root, + this, + clock, environment, allocationPolicy ) @@ -122,6 +120,8 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int) val failureDomain = if (parent.operationalPhenomena.failureFrequency > 0) { logger.debug("ENABLING failures") createFailureDomain( + this, + clock, seeder.nextInt(), parent.operationalPhenomena.failureFrequency, bareMetalProvisioner, @@ -131,8 +131,10 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int) null } - attachMonitor(scheduler, monitor) + attachMonitor(this, clock, scheduler, monitor) processTrace( + this, + clock, trace, scheduler, chan, @@ -151,9 +153,8 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int) } try { - system.run() + testScope.advanceUntilIdle() } finally { - system.terminate() monitor.close() } } diff --git a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt index 5ecf7605..ebee1543 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -24,9 +24,6 @@ package com.atlarge.opendc.experiments.sc20 -import com.atlarge.odcsim.Domain -import com.atlarge.odcsim.SimulationEngine -import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService @@ -42,32 +39,35 @@ import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.TraceReader +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.TestCoroutineScope import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertAll import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll +import org.opendc.simulator.utils.DelayControllerClockAdapter import java.io.File -import java.util.ServiceLoader +import java.time.Clock /** * An integration test suite for the SC20 experiments. */ +@OptIn(ExperimentalCoroutinesApi::class) class Sc20IntegrationTest { /** - * The simulation engine to use. + * The [TestCoroutineScope] to use. */ - private lateinit var simulationEngine: SimulationEngine + private lateinit var testScope: TestCoroutineScope /** - * The root simulation domain to run in. + * The simulation clock to use. */ - private lateinit var root: Domain + private lateinit var clock: Clock /** * The monitor used to keep track of the metrics. @@ -79,9 +79,9 @@ class Sc20IntegrationTest { */ @BeforeEach fun setUp() { - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - simulationEngine = provider("test") - root = simulationEngine.newDomain("root") + testScope = TestCoroutineScope() + clock = DelayControllerClockAdapter(testScope) + monitor = TestExperimentReporter() } @@ -89,9 +89,7 @@ class Sc20IntegrationTest { * Tear down the experimental environment. */ @AfterEach - fun tearDown() = runBlocking { - simulationEngine.terminate() - } + fun tearDown() = testScope.cleanupTestCoroutines() @Test fun smoke() { @@ -103,9 +101,10 @@ class Sc20IntegrationTest { val environmentReader = createTestEnvironmentReader() lateinit var scheduler: SimpleVirtProvisioningService - root.launch { + testScope.launch { val res = createProvisioner( - root, + this, + clock, environmentReader, allocationPolicy ) @@ -115,6 +114,8 @@ class Sc20IntegrationTest { val failureDomain = if (failures) { println("ENABLING failures") createFailureDomain( + this, + clock, seed, 24.0 * 7, bareMetalProvisioner, @@ -124,8 +125,10 @@ class Sc20IntegrationTest { null } - attachMonitor(scheduler, monitor) + attachMonitor(this, clock, scheduler, monitor) processTrace( + this, + clock, traceReader, scheduler, chan, @@ -159,16 +162,19 @@ class Sc20IntegrationTest { val environmentReader = createTestEnvironmentReader("single") lateinit var scheduler: SimpleVirtProvisioningService - root.launch { + testScope.launch { val res = createProvisioner( - root, + this, + clock, environmentReader, allocationPolicy ) scheduler = res.second - attachMonitor(scheduler, monitor) + attachMonitor(this, clock, scheduler, monitor) processTrace( + this, + clock, traceReader, scheduler, chan, @@ -195,9 +201,7 @@ class Sc20IntegrationTest { /** * Run the simulation. */ - private fun runSimulation() = runBlocking { - simulationEngine.run() - } + private fun runSimulation() = testScope.advanceUntilIdle() /** * Obtain the trace reader for the test. diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt index 4c4dcf37..570b936d 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/EnvironmentReader.kt @@ -27,6 +27,7 @@ package com.atlarge.opendc.format.environment import com.atlarge.opendc.core.Environment import kotlinx.coroutines.CoroutineScope import java.io.Closeable +import java.time.Clock /** * An interface for reading descriptions of topology environments into memory as [Environment]. @@ -35,5 +36,5 @@ interface EnvironmentReader : Closeable { /** * Construct an [Environment] in the specified [CoroutineScope]. */ - suspend fun construct(coroutineScope: CoroutineScope): Environment + suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment } diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt index 2b608aef..188d9fd8 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc18/Sc18EnvironmentReader.kt @@ -24,7 +24,6 @@ package com.atlarge.opendc.format.environment.sc18 -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit @@ -41,6 +40,7 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import kotlinx.coroutines.CoroutineScope import java.io.InputStream +import java.time.Clock import java.util.UUID /** @@ -56,9 +56,7 @@ class Sc18EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb */ private val setup: Setup = mapper.readValue(input) - override suspend fun construct(coroutineScope: CoroutineScope): Environment { - val clock = simulationContext.clock - + override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { var counter = 0 val nodes = setup.rooms.flatMap { room -> room.objects.flatMap { roomObject -> diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt index 49118675..d7845081 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20ClusterEnvironmentReader.kt @@ -24,7 +24,6 @@ package com.atlarge.opendc.format.environment.sc20 -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit @@ -42,6 +41,7 @@ import kotlinx.coroutines.CoroutineScope import java.io.File import java.io.FileInputStream import java.io.InputStream +import java.time.Clock import java.util.Random import java.util.UUID @@ -57,9 +57,7 @@ class Sc20ClusterEnvironmentReader( constructor(file: File) : this(FileInputStream(file)) @Suppress("BlockingMethodInNonBlockingContext") - override suspend fun construct(coroutineScope: CoroutineScope): Environment { - val clock = simulationContext.clock - + override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { var clusterIdCol = 0 var speedCol = 0 var numberOfHostsCol = 0 diff --git a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt index f22f595f..adfa1cf0 100644 --- a/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt +++ b/simulator/opendc/opendc-format/src/main/kotlin/com/atlarge/opendc/format/environment/sc20/Sc20EnvironmentReader.kt @@ -24,7 +24,6 @@ package com.atlarge.opendc.format.environment.sc20 -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit @@ -42,6 +41,7 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import kotlinx.coroutines.CoroutineScope import java.io.InputStream +import java.time.Clock import java.util.UUID /** @@ -56,8 +56,7 @@ class Sc20EnvironmentReader(input: InputStream, mapper: ObjectMapper = jacksonOb */ private val setup: Setup = mapper.readValue(input) - override suspend fun construct(coroutineScope: CoroutineScope): Environment { - val clock = simulationContext.clock + override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { var counter = 0 val nodes = setup.rooms.flatMap { room -> room.objects.flatMap { roomObject -> diff --git a/simulator/opendc/opendc-runner-web/build.gradle.kts b/simulator/opendc/opendc-runner-web/build.gradle.kts index 6f725de1..1d263e75 100644 --- a/simulator/opendc/opendc-runner-web/build.gradle.kts +++ b/simulator/opendc/opendc-runner-web/build.gradle.kts @@ -39,6 +39,7 @@ dependencies { implementation(project(":opendc:opendc-compute")) implementation(project(":opendc:opendc-format")) implementation(project(":opendc:opendc-experiments-sc20")) + implementation(project(":opendc:opendc-simulator")) implementation("com.github.ajalt:clikt:2.8.0") implementation("io.github.microutils:kotlin-logging:1.7.10") diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt index 9cfe5531..ac4d9087 100644 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/Main.kt @@ -1,6 +1,5 @@ package com.atlarge.opendc.runner.web -import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.opendc.compute.virt.service.allocation.* import com.atlarge.opendc.experiments.sc20.experiment.attachMonitor import com.atlarge.opendc.experiments.sc20.experiment.createFailureDomain @@ -24,8 +23,10 @@ import com.mongodb.client.MongoDatabase import com.mongodb.client.model.Filters import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.test.TestCoroutineScope import mu.KotlinLogging import org.bson.Document +import org.opendc.simulator.utils.DelayControllerClockAdapter import java.io.File import java.util.* import kotlin.random.Random @@ -33,13 +34,9 @@ import kotlin.random.Random private val logger = KotlinLogging.logger {} /** - * The provider for the simulation engine to use. - */ -private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - -/** * Represents the CLI command for starting the OpenDC web runner. */ +@OptIn(ExperimentalCoroutinesApi::class) class RunnerCli : CliktCommand(name = "runner") { /** * The name of the database to use. @@ -195,8 +192,8 @@ class RunnerCli : CliktCommand(name = "runner") { val workloadFraction = traceDocument.get("loadSamplingFraction", Number::class.java).toDouble() val seeder = Random(seed) - val system = provider("experiment-$id") - val root = system.newDomain("root") + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) val chan = Channel<Unit>(Channel.CONFLATED) @@ -230,9 +227,10 @@ class RunnerCli : CliktCommand(name = "runner") { 4096 ) - root.launch { + testScope.launch { val (bareMetalProvisioner, scheduler) = createProvisioner( - root, + this, + clock, environment, allocationPolicy ) @@ -240,6 +238,8 @@ class RunnerCli : CliktCommand(name = "runner") { val failureDomain = if (operational.getBoolean("failuresEnabled")) { logger.debug("ENABLING failures") createFailureDomain( + testScope, + clock, seeder.nextInt(), operational.get("failureFrequency", Number::class.java)?.toDouble() ?: 24.0 * 7, bareMetalProvisioner, @@ -249,8 +249,10 @@ class RunnerCli : CliktCommand(name = "runner") { null } - attachMonitor(scheduler, monitor) + attachMonitor(this, clock, scheduler, monitor) processTrace( + this, + clock, trace, scheduler, chan, @@ -269,9 +271,8 @@ class RunnerCli : CliktCommand(name = "runner") { } try { - system.run() + testScope.advanceUntilIdle() } finally { - system.terminate() monitor.close() } } diff --git a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt index ab683985..f9b1c6c4 100644 --- a/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt +++ b/simulator/opendc/opendc-runner-web/src/main/kotlin/com/atlarge/opendc/runner/web/TopologyParser.kt @@ -1,6 +1,5 @@ package com.atlarge.opendc.runner.web -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ProcessingNode import com.atlarge.opendc.compute.core.ProcessingUnit @@ -23,6 +22,7 @@ import com.mongodb.client.model.Projections import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.launch import org.bson.Document +import java.time.Clock import java.util.* /** @@ -32,8 +32,7 @@ class TopologyParser(private val collection: MongoCollection<Document>, private /** * Parse the topology with the specified [id]. */ - override suspend fun construct(coroutineScope: CoroutineScope): Environment { - val clock = simulationContext.clock + override suspend fun construct(coroutineScope: CoroutineScope, clock: Clock): Environment { val nodes = mutableListOf<SimpleBareMetalDriver>() val random = Random(0) diff --git a/simulator/opendc/opendc-workflows/build.gradle.kts b/simulator/opendc/opendc-workflows/build.gradle.kts index 893c9020..62c4bc25 100644 --- a/simulator/opendc/opendc-workflows/build.gradle.kts +++ b/simulator/opendc/opendc-workflows/build.gradle.kts @@ -33,7 +33,7 @@ dependencies { api(project(":opendc:opendc-core")) api(project(":opendc:opendc-compute")) - testRuntimeOnly(project(":odcsim:odcsim-engine-omega")) + testImplementation(project(":opendc:opendc-simulator")) testImplementation(project(":opendc:opendc-format")) testImplementation("org.junit.jupiter:junit-jupiter-api:${Library.JUNIT_JUPITER}") testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${Library.JUNIT_JUPITER}") diff --git a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt index 655d8e1d..114003a3 100644 --- a/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt +++ b/simulator/opendc/opendc-workflows/src/test/kotlin/com/atlarge/opendc/workflows/service/StageWorkflowSchedulerIntegrationTest.kt @@ -24,8 +24,6 @@ package com.atlarge.opendc.workflows.service -import com.atlarge.odcsim.SimulationEngineProvider -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.format.environment.sc18.Sc18EnvironmentReader import com.atlarge.opendc.format.trace.gwf.GwfTraceReader @@ -35,22 +33,22 @@ import com.atlarge.opendc.workflows.service.stage.resource.FirstFitResourceSelec import com.atlarge.opendc.workflows.service.stage.resource.FunctionalResourceFilterPolicy import com.atlarge.opendc.workflows.service.stage.task.NullTaskEligibilityPolicy import com.atlarge.opendc.workflows.service.stage.task.SubmissionTimeTaskOrderPolicy -import kotlinx.coroutines.async -import kotlinx.coroutines.delay +import kotlinx.coroutines.* import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.TestCoroutineScope import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotEquals import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test -import java.util.ServiceLoader +import org.opendc.simulator.utils.DelayControllerClockAdapter import kotlin.math.max /** * Integration test suite for the [StageWorkflowService]. */ @DisplayName("StageWorkflowService") +@OptIn(ExperimentalCoroutinesApi::class) internal class StageWorkflowSchedulerIntegrationTest { /** * A large integration test where we check whether all tasks in some trace are executed correctly. @@ -63,17 +61,15 @@ internal class StageWorkflowSchedulerIntegrationTest { var tasksStarted = 0L var tasksFinished = 0L - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - val system = provider(name = "sim") + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) - val schedulerDomain = system.newDomain(name = "scheduler") - val schedulerAsync = schedulerDomain.async { - val clock = simulationContext.clock + val schedulerAsync = testScope.async { val environment = Sc18EnvironmentReader(object {}.javaClass.getResourceAsStream("/environment.json")) - .use { it.construct(schedulerDomain) } + .use { it.construct(testScope, clock) } StageWorkflowService( - schedulerDomain, + testScope, clock, environment.platforms[0].zones[0].services[ProvisioningService], mode = WorkflowSchedulerMode.Batch(100), @@ -86,9 +82,7 @@ internal class StageWorkflowSchedulerIntegrationTest { ) } - val broker = system.newDomain(name = "broker") - - broker.launch { + testScope.launch { val scheduler = schedulerAsync.await() scheduler.events .onEach { event -> @@ -102,24 +96,21 @@ internal class StageWorkflowSchedulerIntegrationTest { .collect() } - broker.launch { - val ctx = simulationContext + testScope.launch { val reader = GwfTraceReader(object {}.javaClass.getResourceAsStream("/trace.gwf")) val scheduler = schedulerAsync.await() while (reader.hasNext()) { val (time, job) = reader.next() jobsSubmitted++ - delay(max(0, time * 1000 - ctx.clock.millis())) + delay(max(0, time * 1000 - clock.millis())) scheduler.submit(job) } } - runBlocking { - system.run() - system.terminate() - } + testScope.advanceUntilIdle() + assertNotEquals(0, jobsSubmitted, "No jobs submitted") assertEquals(jobsSubmitted, jobsStarted, "Not all submitted jobs started") assertEquals(jobsSubmitted, jobsFinished, "Not all started jobs finished") assertEquals(tasksStarted, tasksFinished, "Not all started tasks finished") |
