summaryrefslogtreecommitdiff
path: root/simulator/opendc/opendc-experiments-sc20
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-09-30 23:56:07 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-09-30 23:56:07 +0200
commitfcae560208df4860bc7461f955bf3b522b0e61c5 (patch)
tree933f47f1061274a6a7e648da82c13f08fce41ea5 /simulator/opendc/opendc-experiments-sc20
parent1766888d6dde44f96508a4bc6878978ddcaa073d (diff)
Migrate from Domain to TestCoroutineScope
This change eliminates the use of Domain and simulationContext in favour of the generic (Test)CoroutineScope and Clock classes. In this way, we decouple the OpenDC modules and their logic from simulation-related code. In this way, we also simplify eventual attempt for emulating OpenDC componments in real-time.
Diffstat (limited to 'simulator/opendc/opendc-experiments-sc20')
-rw-r--r--simulator/opendc/opendc-experiments-sc20/build.gradle.kts1
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt61
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt29
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt52
4 files changed, 71 insertions, 72 deletions
diff --git a/simulator/opendc/opendc-experiments-sc20/build.gradle.kts b/simulator/opendc/opendc-experiments-sc20/build.gradle.kts
index a66612e4..713354e7 100644
--- a/simulator/opendc/opendc-experiments-sc20/build.gradle.kts
+++ b/simulator/opendc/opendc-experiments-sc20/build.gradle.kts
@@ -38,6 +38,7 @@ application {
dependencies {
api(project(":opendc:opendc-core"))
implementation(project(":opendc:opendc-format"))
+ implementation(project(":opendc:opendc-simulator"))
implementation("com.github.ajalt:clikt:2.6.0")
implementation("me.tongfei:progressbar:0.8.1")
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
index 3765f307..b68ee97e 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
@@ -24,9 +24,6 @@
package com.atlarge.opendc.experiments.sc20.experiment
-import com.atlarge.odcsim.Domain
-import com.atlarge.odcsim.SimulationContext
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
@@ -46,17 +43,15 @@ import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor
import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.trace.TraceReader
-import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.takeWhile
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.withContext
import mu.KotlinLogging
import java.io.File
+import java.time.Clock
import kotlin.math.ln
import kotlin.math.max
import kotlin.random.Random
@@ -70,14 +65,14 @@ private val logger = KotlinLogging.logger {}
* Construct the failure domain for the experiments.
*/
suspend fun createFailureDomain(
+ coroutineScope: CoroutineScope,
+ clock: Clock,
seed: Int,
failureInterval: Double,
bareMetalProvisioner: ProvisioningService,
chan: Channel<Unit>
-): Domain {
- val root = simulationContext.domain
- val domain = root.newDomain(name = "failures")
- domain.launch {
+): CoroutineScope {
+ val job = coroutineScope.launch {
chan.receive()
val random = Random(seed)
val injectors = mutableMapOf<String, FaultInjector>()
@@ -86,7 +81,8 @@ suspend fun createFailureDomain(
val injector =
injectors.getOrPut(cluster) {
createFaultInjector(
- simulationContext,
+ this,
+ clock,
random,
failureInterval
)
@@ -94,18 +90,18 @@ suspend fun createFailureDomain(
injector.enqueue(node.metadata["driver"] as FailureDomain)
}
}
- return domain
+ return CoroutineScope(coroutineScope.coroutineContext + job)
}
/**
* Obtain the [FaultInjector] to use for the experiments.
*/
-fun createFaultInjector(simulationContext: SimulationContext, random: Random, failureInterval: Double): FaultInjector {
+fun createFaultInjector(coroutineScope: CoroutineScope, clock: Clock, random: Random, failureInterval: Double): FaultInjector {
// Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
// GRID'5000
return CorrelatedFaultInjector(
- simulationContext.domain,
- simulationContext.clock,
+ coroutineScope,
+ clock,
iatScale = ln(failureInterval), iatShape = 1.03, // Hours
sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1
dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes
@@ -129,31 +125,30 @@ fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInter
* Construct the environment for a VM provisioner and return the provisioner instance.
*/
suspend fun createProvisioner(
- root: Domain,
+ coroutineScope: CoroutineScope,
+ clock: Clock,
environmentReader: EnvironmentReader,
allocationPolicy: AllocationPolicy
-): Pair<ProvisioningService, SimpleVirtProvisioningService> = withContext(root.coroutineContext) {
- val environment = environmentReader.use { it.construct(root) }
+): Pair<ProvisioningService, SimpleVirtProvisioningService> {
+ val environment = environmentReader.use { it.construct(coroutineScope, clock) }
val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService]
// Wait for the bare metal nodes to be spawned
delay(10)
- val scheduler = SimpleVirtProvisioningService(simulationContext.domain, simulationContext.clock, bareMetalProvisioner, allocationPolicy)
+ val scheduler = SimpleVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy)
// Wait for the hypervisors to be spawned
delay(10)
- bareMetalProvisioner to scheduler
+ return bareMetalProvisioner to scheduler
}
/**
* Attach the specified monitor to the VM provisioner.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: ExperimentMonitor) {
- val domain = simulationContext.domain
- val clock = simulationContext.clock
+suspend fun attachMonitor(coroutineScope: CoroutineScope, clock: Clock, scheduler: SimpleVirtProvisioningService, monitor: ExperimentMonitor) {
val hypervisors = scheduler.drivers()
// Monitor hypervisor events
@@ -169,7 +164,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp
}
}
}
- .launchIn(domain)
+ .launchIn(coroutineScope)
hypervisor.events
.onEach { event ->
when (event) {
@@ -186,12 +181,12 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp
)
}
}
- .launchIn(domain)
+ .launchIn(coroutineScope)
val driver = hypervisor.server.services[BareMetalDriver.Key]
driver.powerDraw
.onEach { monitor.reportPowerConsumption(hypervisor.server, it) }
- .launchIn(domain)
+ .launchIn(coroutineScope)
}
scheduler.events
@@ -201,15 +196,13 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp
monitor.reportProvisionerMetrics(clock.millis(), event)
}
}
- .launchIn(domain)
+ .launchIn(coroutineScope)
}
/**
* Process the trace.
*/
-suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: ExperimentMonitor, vmPlacements: Map<String, String> = emptyMap()) {
- val domain = simulationContext.domain
-
+suspend fun processTrace(coroutineScope: CoroutineScope, clock: Clock, reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: ExperimentMonitor, vmPlacements: Map<String, String> = emptyMap()) {
try {
var submitted = 0
@@ -217,8 +210,8 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP
val (time, workload) = reader.next()
submitted++
- delay(max(0, time - simulationContext.clock.millis()))
- domain.launch {
+ delay(max(0, time - clock.millis()))
+ coroutineScope.launch {
chan.send(Unit)
val server = scheduler.deploy(
workload.image.name,
@@ -229,7 +222,7 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP
server.events
.onEach {
if (it is ServerEvent.StateChanged) {
- monitor.reportVmStateChange(simulationContext.clock.millis(), it.server)
+ monitor.reportVmStateChange(clock.millis(), it.server)
}
}
.collect()
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
index 7b42b095..76a10e56 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
@@ -24,7 +24,6 @@
package com.atlarge.opendc.experiments.sc20.experiment
-import com.atlarge.odcsim.SimulationEngineProvider
import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy
@@ -38,12 +37,14 @@ import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionC
import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
+import kotlinx.coroutines.test.TestCoroutineScope
import mu.KotlinLogging
+import org.opendc.simulator.utils.DelayControllerClockAdapter
import java.io.File
-import java.util.ServiceLoader
import kotlin.random.Random
/**
@@ -52,19 +53,15 @@ import kotlin.random.Random
private val logger = KotlinLogging.logger {}
/**
- * The provider for the simulation engine to use.
- */
-private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
-
-/**
* An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the
* same set of parameters.
*/
+@OptIn(ExperimentalCoroutinesApi::class)
public data class Run(override val parent: Scenario, val id: Int, val seed: Int) : TrialExperimentDescriptor() {
override suspend fun invoke(context: ExperimentExecutionContext) {
val experiment = parent.parent.parent
- val system = provider("experiment-$id")
- val root = system.newDomain("root")
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
val seeder = Random(seed)
val environment = Sc20ClusterEnvironmentReader(File(experiment.environments, "${parent.topology.name}.txt"))
@@ -112,9 +109,10 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int)
parent.parent.parent.bufferSize
)
- root.launch {
+ testScope.launch {
val (bareMetalProvisioner, scheduler) = createProvisioner(
- root,
+ this,
+ clock,
environment,
allocationPolicy
)
@@ -122,6 +120,8 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int)
val failureDomain = if (parent.operationalPhenomena.failureFrequency > 0) {
logger.debug("ENABLING failures")
createFailureDomain(
+ this,
+ clock,
seeder.nextInt(),
parent.operationalPhenomena.failureFrequency,
bareMetalProvisioner,
@@ -131,8 +131,10 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int)
null
}
- attachMonitor(scheduler, monitor)
+ attachMonitor(this, clock, scheduler, monitor)
processTrace(
+ this,
+ clock,
trace,
scheduler,
chan,
@@ -151,9 +153,8 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int)
}
try {
- system.run()
+ testScope.advanceUntilIdle()
} finally {
- system.terminate()
monitor.close()
}
}
diff --git a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
index 5ecf7605..ebee1543 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
@@ -24,9 +24,6 @@
package com.atlarge.opendc.experiments.sc20
-import com.atlarge.odcsim.Domain
-import com.atlarge.odcsim.SimulationEngine
-import com.atlarge.odcsim.SimulationEngineProvider
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
@@ -42,32 +39,35 @@ import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import com.atlarge.opendc.format.trace.TraceReader
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
-import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.test.TestCoroutineScope
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertAll
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
+import org.opendc.simulator.utils.DelayControllerClockAdapter
import java.io.File
-import java.util.ServiceLoader
+import java.time.Clock
/**
* An integration test suite for the SC20 experiments.
*/
+@OptIn(ExperimentalCoroutinesApi::class)
class Sc20IntegrationTest {
/**
- * The simulation engine to use.
+ * The [TestCoroutineScope] to use.
*/
- private lateinit var simulationEngine: SimulationEngine
+ private lateinit var testScope: TestCoroutineScope
/**
- * The root simulation domain to run in.
+ * The simulation clock to use.
*/
- private lateinit var root: Domain
+ private lateinit var clock: Clock
/**
* The monitor used to keep track of the metrics.
@@ -79,9 +79,9 @@ class Sc20IntegrationTest {
*/
@BeforeEach
fun setUp() {
- val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- simulationEngine = provider("test")
- root = simulationEngine.newDomain("root")
+ testScope = TestCoroutineScope()
+ clock = DelayControllerClockAdapter(testScope)
+
monitor = TestExperimentReporter()
}
@@ -89,9 +89,7 @@ class Sc20IntegrationTest {
* Tear down the experimental environment.
*/
@AfterEach
- fun tearDown() = runBlocking {
- simulationEngine.terminate()
- }
+ fun tearDown() = testScope.cleanupTestCoroutines()
@Test
fun smoke() {
@@ -103,9 +101,10 @@ class Sc20IntegrationTest {
val environmentReader = createTestEnvironmentReader()
lateinit var scheduler: SimpleVirtProvisioningService
- root.launch {
+ testScope.launch {
val res = createProvisioner(
- root,
+ this,
+ clock,
environmentReader,
allocationPolicy
)
@@ -115,6 +114,8 @@ class Sc20IntegrationTest {
val failureDomain = if (failures) {
println("ENABLING failures")
createFailureDomain(
+ this,
+ clock,
seed,
24.0 * 7,
bareMetalProvisioner,
@@ -124,8 +125,10 @@ class Sc20IntegrationTest {
null
}
- attachMonitor(scheduler, monitor)
+ attachMonitor(this, clock, scheduler, monitor)
processTrace(
+ this,
+ clock,
traceReader,
scheduler,
chan,
@@ -159,16 +162,19 @@ class Sc20IntegrationTest {
val environmentReader = createTestEnvironmentReader("single")
lateinit var scheduler: SimpleVirtProvisioningService
- root.launch {
+ testScope.launch {
val res = createProvisioner(
- root,
+ this,
+ clock,
environmentReader,
allocationPolicy
)
scheduler = res.second
- attachMonitor(scheduler, monitor)
+ attachMonitor(this, clock, scheduler, monitor)
processTrace(
+ this,
+ clock,
traceReader,
scheduler,
chan,
@@ -195,9 +201,7 @@ class Sc20IntegrationTest {
/**
* Run the simulation.
*/
- private fun runSimulation() = runBlocking {
- simulationEngine.run()
- }
+ private fun runSimulation() = testScope.advanceUntilIdle()
/**
* Obtain the trace reader for the test.