diff options
Diffstat (limited to 'simulator/opendc/opendc-experiments-sc20/src')
3 files changed, 70 insertions, 72 deletions
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt index 3765f307..b68ee97e 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt @@ -24,9 +24,6 @@ package com.atlarge.opendc.experiments.sc20.experiment -import com.atlarge.odcsim.Domain -import com.atlarge.odcsim.SimulationContext -import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ServerEvent import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel @@ -46,17 +43,15 @@ import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.trace.TraceReader -import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.takeWhile -import kotlinx.coroutines.launch -import kotlinx.coroutines.withContext import mu.KotlinLogging import java.io.File +import java.time.Clock import kotlin.math.ln import kotlin.math.max import kotlin.random.Random @@ -70,14 +65,14 @@ private val logger = KotlinLogging.logger {} * Construct the failure domain for the experiments. */ suspend fun createFailureDomain( + coroutineScope: CoroutineScope, + clock: Clock, seed: Int, failureInterval: Double, bareMetalProvisioner: ProvisioningService, chan: Channel<Unit> -): Domain { - val root = simulationContext.domain - val domain = root.newDomain(name = "failures") - domain.launch { +): CoroutineScope { + val job = coroutineScope.launch { chan.receive() val random = Random(seed) val injectors = mutableMapOf<String, FaultInjector>() @@ -86,7 +81,8 @@ suspend fun createFailureDomain( val injector = injectors.getOrPut(cluster) { createFaultInjector( - simulationContext, + this, + clock, random, failureInterval ) @@ -94,18 +90,18 @@ suspend fun createFailureDomain( injector.enqueue(node.metadata["driver"] as FailureDomain) } } - return domain + return CoroutineScope(coroutineScope.coroutineContext + job) } /** * Obtain the [FaultInjector] to use for the experiments. */ -fun createFaultInjector(simulationContext: SimulationContext, random: Random, failureInterval: Double): FaultInjector { +fun createFaultInjector(coroutineScope: CoroutineScope, clock: Clock, random: Random, failureInterval: Double): FaultInjector { // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 // GRID'5000 return CorrelatedFaultInjector( - simulationContext.domain, - simulationContext.clock, + coroutineScope, + clock, iatScale = ln(failureInterval), iatShape = 1.03, // Hours sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1 dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes @@ -129,31 +125,30 @@ fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInter * Construct the environment for a VM provisioner and return the provisioner instance. */ suspend fun createProvisioner( - root: Domain, + coroutineScope: CoroutineScope, + clock: Clock, environmentReader: EnvironmentReader, allocationPolicy: AllocationPolicy -): Pair<ProvisioningService, SimpleVirtProvisioningService> = withContext(root.coroutineContext) { - val environment = environmentReader.use { it.construct(root) } +): Pair<ProvisioningService, SimpleVirtProvisioningService> { + val environment = environmentReader.use { it.construct(coroutineScope, clock) } val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService] // Wait for the bare metal nodes to be spawned delay(10) - val scheduler = SimpleVirtProvisioningService(simulationContext.domain, simulationContext.clock, bareMetalProvisioner, allocationPolicy) + val scheduler = SimpleVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy) // Wait for the hypervisors to be spawned delay(10) - bareMetalProvisioner to scheduler + return bareMetalProvisioner to scheduler } /** * Attach the specified monitor to the VM provisioner. */ @OptIn(ExperimentalCoroutinesApi::class) -suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: ExperimentMonitor) { - val domain = simulationContext.domain - val clock = simulationContext.clock +suspend fun attachMonitor(coroutineScope: CoroutineScope, clock: Clock, scheduler: SimpleVirtProvisioningService, monitor: ExperimentMonitor) { val hypervisors = scheduler.drivers() // Monitor hypervisor events @@ -169,7 +164,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp } } } - .launchIn(domain) + .launchIn(coroutineScope) hypervisor.events .onEach { event -> when (event) { @@ -186,12 +181,12 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp ) } } - .launchIn(domain) + .launchIn(coroutineScope) val driver = hypervisor.server.services[BareMetalDriver.Key] driver.powerDraw .onEach { monitor.reportPowerConsumption(hypervisor.server, it) } - .launchIn(domain) + .launchIn(coroutineScope) } scheduler.events @@ -201,15 +196,13 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp monitor.reportProvisionerMetrics(clock.millis(), event) } } - .launchIn(domain) + .launchIn(coroutineScope) } /** * Process the trace. */ -suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: ExperimentMonitor, vmPlacements: Map<String, String> = emptyMap()) { - val domain = simulationContext.domain - +suspend fun processTrace(coroutineScope: CoroutineScope, clock: Clock, reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: ExperimentMonitor, vmPlacements: Map<String, String> = emptyMap()) { try { var submitted = 0 @@ -217,8 +210,8 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP val (time, workload) = reader.next() submitted++ - delay(max(0, time - simulationContext.clock.millis())) - domain.launch { + delay(max(0, time - clock.millis())) + coroutineScope.launch { chan.send(Unit) val server = scheduler.deploy( workload.image.name, @@ -229,7 +222,7 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP server.events .onEach { if (it is ServerEvent.StateChanged) { - monitor.reportVmStateChange(simulationContext.clock.millis(), it.server) + monitor.reportVmStateChange(clock.millis(), it.server) } } .collect() diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt index 7b42b095..76a10e56 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt @@ -24,7 +24,6 @@ package com.atlarge.opendc.experiments.sc20.experiment -import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy @@ -38,12 +37,14 @@ import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionC import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch +import kotlinx.coroutines.test.TestCoroutineScope import mu.KotlinLogging +import org.opendc.simulator.utils.DelayControllerClockAdapter import java.io.File -import java.util.ServiceLoader import kotlin.random.Random /** @@ -52,19 +53,15 @@ import kotlin.random.Random private val logger = KotlinLogging.logger {} /** - * The provider for the simulation engine to use. - */ -private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - -/** * An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the * same set of parameters. */ +@OptIn(ExperimentalCoroutinesApi::class) public data class Run(override val parent: Scenario, val id: Int, val seed: Int) : TrialExperimentDescriptor() { override suspend fun invoke(context: ExperimentExecutionContext) { val experiment = parent.parent.parent - val system = provider("experiment-$id") - val root = system.newDomain("root") + val testScope = TestCoroutineScope() + val clock = DelayControllerClockAdapter(testScope) val seeder = Random(seed) val environment = Sc20ClusterEnvironmentReader(File(experiment.environments, "${parent.topology.name}.txt")) @@ -112,9 +109,10 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int) parent.parent.parent.bufferSize ) - root.launch { + testScope.launch { val (bareMetalProvisioner, scheduler) = createProvisioner( - root, + this, + clock, environment, allocationPolicy ) @@ -122,6 +120,8 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int) val failureDomain = if (parent.operationalPhenomena.failureFrequency > 0) { logger.debug("ENABLING failures") createFailureDomain( + this, + clock, seeder.nextInt(), parent.operationalPhenomena.failureFrequency, bareMetalProvisioner, @@ -131,8 +131,10 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int) null } - attachMonitor(scheduler, monitor) + attachMonitor(this, clock, scheduler, monitor) processTrace( + this, + clock, trace, scheduler, chan, @@ -151,9 +153,8 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int) } try { - system.run() + testScope.advanceUntilIdle() } finally { - system.terminate() monitor.close() } } diff --git a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt index 5ecf7605..ebee1543 100644 --- a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt +++ b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt @@ -24,9 +24,6 @@ package com.atlarge.opendc.experiments.sc20 -import com.atlarge.odcsim.Domain -import com.atlarge.odcsim.SimulationEngine -import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.workload.VmWorkload import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService @@ -42,32 +39,35 @@ import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader import com.atlarge.opendc.format.environment.EnvironmentReader import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.TraceReader +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.TestCoroutineScope import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertAll import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll +import org.opendc.simulator.utils.DelayControllerClockAdapter import java.io.File -import java.util.ServiceLoader +import java.time.Clock /** * An integration test suite for the SC20 experiments. */ +@OptIn(ExperimentalCoroutinesApi::class) class Sc20IntegrationTest { /** - * The simulation engine to use. + * The [TestCoroutineScope] to use. */ - private lateinit var simulationEngine: SimulationEngine + private lateinit var testScope: TestCoroutineScope /** - * The root simulation domain to run in. + * The simulation clock to use. */ - private lateinit var root: Domain + private lateinit var clock: Clock /** * The monitor used to keep track of the metrics. @@ -79,9 +79,9 @@ class Sc20IntegrationTest { */ @BeforeEach fun setUp() { - val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() - simulationEngine = provider("test") - root = simulationEngine.newDomain("root") + testScope = TestCoroutineScope() + clock = DelayControllerClockAdapter(testScope) + monitor = TestExperimentReporter() } @@ -89,9 +89,7 @@ class Sc20IntegrationTest { * Tear down the experimental environment. */ @AfterEach - fun tearDown() = runBlocking { - simulationEngine.terminate() - } + fun tearDown() = testScope.cleanupTestCoroutines() @Test fun smoke() { @@ -103,9 +101,10 @@ class Sc20IntegrationTest { val environmentReader = createTestEnvironmentReader() lateinit var scheduler: SimpleVirtProvisioningService - root.launch { + testScope.launch { val res = createProvisioner( - root, + this, + clock, environmentReader, allocationPolicy ) @@ -115,6 +114,8 @@ class Sc20IntegrationTest { val failureDomain = if (failures) { println("ENABLING failures") createFailureDomain( + this, + clock, seed, 24.0 * 7, bareMetalProvisioner, @@ -124,8 +125,10 @@ class Sc20IntegrationTest { null } - attachMonitor(scheduler, monitor) + attachMonitor(this, clock, scheduler, monitor) processTrace( + this, + clock, traceReader, scheduler, chan, @@ -159,16 +162,19 @@ class Sc20IntegrationTest { val environmentReader = createTestEnvironmentReader("single") lateinit var scheduler: SimpleVirtProvisioningService - root.launch { + testScope.launch { val res = createProvisioner( - root, + this, + clock, environmentReader, allocationPolicy ) scheduler = res.second - attachMonitor(scheduler, monitor) + attachMonitor(this, clock, scheduler, monitor) processTrace( + this, + clock, traceReader, scheduler, chan, @@ -195,9 +201,7 @@ class Sc20IntegrationTest { /** * Run the simulation. */ - private fun runSimulation() = runBlocking { - simulationEngine.run() - } + private fun runSimulation() = testScope.advanceUntilIdle() /** * Obtain the trace reader for the test. |
