diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-09-30 23:56:07 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-09-30 23:56:07 +0200 |
| commit | fcae560208df4860bc7461f955bf3b522b0e61c5 (patch) | |
| tree | 933f47f1061274a6a7e648da82c13f08fce41ea5 /simulator/opendc/opendc-experiments-sc20 | |
| parent | 1766888d6dde44f96508a4bc6878978ddcaa073d (diff) | |
Migrate from Domain to TestCoroutineScope
This change eliminates the use of Domain and simulationContext in favour
of the generic (Test)CoroutineScope and Clock classes. In this way, we
decouple the OpenDC modules and their logic from simulation-related
code.
In this way, we also simplify eventual attempt for emulating OpenDC
componments in real-time.
Diffstat (limited to 'simulator/opendc/opendc-experiments-sc20')
4 files changed, 71 insertions, 72 deletions
diff --git a/simulator/opendc/opendc-experiments-sc20/build.gradle.kts b/simulator/opendc/opendc-experiments-sc20/build.gradle.kts index a66612e4..713354e7 100644 --- a/simulator/opendc/opendc-experiments-sc20/build.gradle.kts +++ b/simulator/opendc/opendc-experiments-sc20/build.gradle.kts @@ -38,6 +38,7 @@ application { dependencies { api(project(":opendc:opendc-core")) implementation(project(":opendc:opendc-format")) + implementation(project(":opendc:opendc-simulator")) implementation("com.github.ajalt:clikt:2.6.0") implementation("me.tongfei:progressbar:0.8.1") diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt index 3765f307..b68ee97e 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt @@ -24,9 +24,6 @@ package com.atlarge.opendc.experiments.sc20.experiment -import com.atlarge.odcsim.Domain -import com.atlarge.odcsim.SimulationContext -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel @@ -46,17 +43,15 @@ import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.trace.TraceReader -import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.takeWhile -import kotlinx.coroutines.launch -import kotlinx.coroutines.withContext import mu.KotlinLogging import java.io.File +import java.time.Clock import kotlin.math.ln import kotlin.math.max import kotlin.random.Random @@ -70,14 +65,14 @@ private val logger = KotlinLogging.logger {} * Construct the failure domain for the experiments. */ suspend fun createFailureDomain( + coroutineScope: CoroutineScope, + clock: Clock, seed: Int, failureInterval: Double, bareMetalProvisioner: ProvisioningService, chan: Channel<Unit> -): Domain { - val root = simulationContext.domain - val domain = root.newDomain(name = "failures") - domain.launch { +): CoroutineScope { + val job = coroutineScope.launch { chan.receive() val random = Random(seed) val injectors = mutableMapOf<String, FaultInjector>() @@ -86,7 +81,8 @@ suspend fun createFailureDomain( val injector = injectors.getOrPut(cluster) { createFaultInjector( - simulationContext, + this, + clock, random, failureInterval ) @@ -94,18 +90,18 @@ suspend fun createFailureDomain( injector.enqueue(node.metadata["driver"] as FailureDomain) } } - return domain + return CoroutineScope(coroutineScope.coroutineContext + job) } /** * Obtain the [FaultInjector] to use for the experiments. */ -fun createFaultInjector(simulationContext: SimulationContext, random: Random, failureInterval: Double): FaultInjector { +fun createFaultInjector(coroutineScope: CoroutineScope, clock: Clock, random: Random, failureInterval: Double): FaultInjector { // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 // GRID'5000 return CorrelatedFaultInjector( - simulationContext.domain, - simulationContext.clock, + coroutineScope, + clock, iatScale = ln(failureInterval), iatShape = 1.03, // Hours sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1 dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes @@ -129,31 +125,30 @@ fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInter * Construct the environment for a VM provisioner and return the provisioner instance. */ suspend fun createProvisioner( - root: Domain, + coroutineScope: CoroutineScope, + clock: Clock, environmentReader: EnvironmentReader, allocationPolicy: AllocationPolicy -): Pair<ProvisioningService, SimpleVirtProvisioningService> = withContext(root.coroutineContext) { - val environment = environmentReader.use { it.construct(root) } +): Pair<ProvisioningService, SimpleVirtProvisioningService> { + val environment = environmentReader.use { it.construct(coroutineScope, clock) } val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService] // Wait for the bare metal nodes to be spawned delay(10) - val scheduler = SimpleVirtProvisioningService(simulationContext.domain, simulationContext.clock, bareMetalProvisioner, allocationPolicy) + val scheduler = SimpleVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy) // Wait for the hypervisors to be spawned delay(10) - bareMetalProvisioner to scheduler + return bareMetalProvisioner to scheduler } /** * Attach the specified monitor to the VM provisioner. */ @OptIn(ExperimentalCoroutinesApi::class) -suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: ExperimentMonitor) { - val domain = simulationContext.domain - val clock = simulationContext.clock +suspend fun attachMonitor(coroutineScope: CoroutineScope, clock: Clock, scheduler: SimpleVirtProvisioningService, monitor: ExperimentMonitor) { val hypervisors = scheduler.drivers() // Monitor hypervisor events @@ -169,7 +164,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp } } } - .launchIn(domain) + .launchIn(coroutineScope) hypervisor.events .onEach { event -> when (event) { @@ -186,12 +181,12 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp ) } } - .launchIn(domain) + .launchIn(coroutineScope) val driver = hypervisor.server.services[BareMetalDriver.Key] driver.powerDraw .onEach { monitor.reportPowerConsumption(hypervisor.server, it) } - .launchIn(domain) + .launchIn(coroutineScope) } scheduler.events @@ -201,15 +196,13 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp monitor.reportProvisionerMetrics(clock.millis(), event) } } - .launchIn(domain) + .launchIn(coroutineScope) } /** * Process the trace. */ -suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: ExperimentMonitor, vmPlacements: Map<String, String> = emptyMap()) { - val domain = simulationContext.domain - +suspend fun processTrace(coroutineScope: CoroutineScope, clock: Clock, reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: ExperimentMonitor, vmPlacements: Map<String, String> = emptyMap()) { try { var submitted = 0 @@ -217,8 +210,8 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP val (time, workload) = reader.next() submitted++ - delay(max(0, time - simulationContext.clock.millis())) - domain.launch { + delay(max(0, time - clock.millis())) + coroutineScope.launch { chan.send(Unit) val server = scheduler.deploy( workload.image.name, @@ -229,7 +222,7 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP server.events .onEach { if (it is ServerEvent.StateChanged) { - monitor.reportVmStateChange(simulationContext.clock.millis(), it.server) + monitor.reportVmStateChange(clock.millis(), it.server) } } .collect() diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt index 7b42b095..76a10e56 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt @@ -24,7 +24,6 @@ package com.atlarge.opendc.experiments.sc20.experiment -import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy @@ -38,12 +37,14 @@ import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionC import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch +import kotlinx.coroutines.test.TestCoroutineScope import mu.KotlinLogging +import org.opendc.simulator.utils.DelayControllerClockAdapter import java.io.File -import java.util.ServiceLoader import kotlin.random.Random /** @@ -52,19 +53,15 @@ import kotlin.random.Random private val logger = KotlinLogging.logger {} /** - * The provider for the simulation engine to use. - */ -private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - -/** * An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the * same set of parameters. */ +@OptIn(ExperimentalCoroutinesApi::class) public data class Run(override val parent: Scenario, val id: Int, val seed: Int) : TrialExperimentDescriptor() { override suspend fun invoke(context: ExperimentExecutionContext) { val experiment = parent.parent.parent - val system = provider("experiment-$id") - val root = system.newDomain("root") + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) val seeder = Random(seed) val environment = Sc20ClusterEnvironmentReader(File(experiment.environments, "${parent.topology.name}.txt")) @@ -112,9 +109,10 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int) parent.parent.parent.bufferSize ) - root.launch { + testScope.launch { val (bareMetalProvisioner, scheduler) = createProvisioner( - root, + this, + clock, environment, allocationPolicy ) @@ -122,6 +120,8 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int) val failureDomain = if (parent.operationalPhenomena.failureFrequency > 0) { logger.debug("ENABLING failures") createFailureDomain( + this, + clock, seeder.nextInt(), parent.operationalPhenomena.failureFrequency, bareMetalProvisioner, @@ -131,8 +131,10 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int) null } - attachMonitor(scheduler, monitor) + attachMonitor(this, clock, scheduler, monitor) processTrace( + this, + clock, trace, scheduler, chan, @@ -151,9 +153,8 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int) } try { - system.run() + testScope.advanceUntilIdle() } finally { - system.terminate() monitor.close() } } diff --git a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt index 5ecf7605..ebee1543 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -24,9 +24,6 @@ package com.atlarge.opendc.experiments.sc20 -import com.atlarge.odcsim.Domain -import com.atlarge.odcsim.SimulationEngine -import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService @@ -42,32 +39,35 @@ import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.TraceReader +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.TestCoroutineScope import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertAll import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll +import org.opendc.simulator.utils.DelayControllerClockAdapter import java.io.File -import java.util.ServiceLoader +import java.time.Clock /** * An integration test suite for the SC20 experiments. */ +@OptIn(ExperimentalCoroutinesApi::class) class Sc20IntegrationTest { /** - * The simulation engine to use. + * The [TestCoroutineScope] to use. */ - private lateinit var simulationEngine: SimulationEngine + private lateinit var testScope: TestCoroutineScope /** - * The root simulation domain to run in. + * The simulation clock to use. */ - private lateinit var root: Domain + private lateinit var clock: Clock /** * The monitor used to keep track of the metrics. @@ -79,9 +79,9 @@ class Sc20IntegrationTest { */ @BeforeEach fun setUp() { - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - simulationEngine = provider("test") - root = simulationEngine.newDomain("root") + testScope = TestCoroutineScope() + clock = DelayControllerClockAdapter(testScope) + monitor = TestExperimentReporter() } @@ -89,9 +89,7 @@ class Sc20IntegrationTest { * Tear down the experimental environment. */ @AfterEach - fun tearDown() = runBlocking { - simulationEngine.terminate() - } + fun tearDown() = testScope.cleanupTestCoroutines() @Test fun smoke() { @@ -103,9 +101,10 @@ class Sc20IntegrationTest { val environmentReader = createTestEnvironmentReader() lateinit var scheduler: SimpleVirtProvisioningService - root.launch { + testScope.launch { val res = createProvisioner( - root, + this, + clock, environmentReader, allocationPolicy ) @@ -115,6 +114,8 @@ class Sc20IntegrationTest { val failureDomain = if (failures) { println("ENABLING failures") createFailureDomain( + this, + clock, seed, 24.0 * 7, bareMetalProvisioner, @@ -124,8 +125,10 @@ class Sc20IntegrationTest { null } - attachMonitor(scheduler, monitor) + attachMonitor(this, clock, scheduler, monitor) processTrace( + this, + clock, traceReader, scheduler, chan, @@ -159,16 +162,19 @@ class Sc20IntegrationTest { val environmentReader = createTestEnvironmentReader("single") lateinit var scheduler: SimpleVirtProvisioningService - root.launch { + testScope.launch { val res = createProvisioner( - root, + this, + clock, environmentReader, allocationPolicy ) scheduler = res.second - attachMonitor(scheduler, monitor) + attachMonitor(this, clock, scheduler, monitor) processTrace( + this, + clock, traceReader, scheduler, chan, @@ -195,9 +201,7 @@ class Sc20IntegrationTest { /** * Run the simulation. */ - private fun runSimulation() = runBlocking { - simulationEngine.run() - } + private fun runSimulation() = testScope.advanceUntilIdle() /** * Obtain the trace reader for the test. |
