summaryrefslogtreecommitdiff
path: root/simulator/opendc/opendc-experiments-sc20/src
diff options
context:
space:
mode:
Diffstat (limited to 'simulator/opendc/opendc-experiments-sc20/src')
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt61
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt29
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt52
3 files changed, 70 insertions, 72 deletions
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
index 3765f307..b68ee97e 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
@@ -24,9 +24,6 @@
package com.atlarge.opendc.experiments.sc20.experiment
-import com.atlarge.odcsim.Domain
-import com.atlarge.odcsim.SimulationContext
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
@@ -46,17 +43,15 @@ import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor
import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.trace.TraceReader
-import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.takeWhile
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.withContext
import mu.KotlinLogging
import java.io.File
+import java.time.Clock
import kotlin.math.ln
import kotlin.math.max
import kotlin.random.Random
@@ -70,14 +65,14 @@ private val logger = KotlinLogging.logger {}
* Construct the failure domain for the experiments.
*/
suspend fun createFailureDomain(
+ coroutineScope: CoroutineScope,
+ clock: Clock,
seed: Int,
failureInterval: Double,
bareMetalProvisioner: ProvisioningService,
chan: Channel<Unit>
-): Domain {
- val root = simulationContext.domain
- val domain = root.newDomain(name = "failures")
- domain.launch {
+): CoroutineScope {
+ val job = coroutineScope.launch {
chan.receive()
val random = Random(seed)
val injectors = mutableMapOf<String, FaultInjector>()
@@ -86,7 +81,8 @@ suspend fun createFailureDomain(
val injector =
injectors.getOrPut(cluster) {
createFaultInjector(
- simulationContext,
+ this,
+ clock,
random,
failureInterval
)
@@ -94,18 +90,18 @@ suspend fun createFailureDomain(
injector.enqueue(node.metadata["driver"] as FailureDomain)
}
}
- return domain
+ return CoroutineScope(coroutineScope.coroutineContext + job)
}
/**
* Obtain the [FaultInjector] to use for the experiments.
*/
-fun createFaultInjector(simulationContext: SimulationContext, random: Random, failureInterval: Double): FaultInjector {
+fun createFaultInjector(coroutineScope: CoroutineScope, clock: Clock, random: Random, failureInterval: Double): FaultInjector {
// Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
// GRID'5000
return CorrelatedFaultInjector(
- simulationContext.domain,
- simulationContext.clock,
+ coroutineScope,
+ clock,
iatScale = ln(failureInterval), iatShape = 1.03, // Hours
sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1
dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes
@@ -129,31 +125,30 @@ fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInter
* Construct the environment for a VM provisioner and return the provisioner instance.
*/
suspend fun createProvisioner(
- root: Domain,
+ coroutineScope: CoroutineScope,
+ clock: Clock,
environmentReader: EnvironmentReader,
allocationPolicy: AllocationPolicy
-): Pair<ProvisioningService, SimpleVirtProvisioningService> = withContext(root.coroutineContext) {
- val environment = environmentReader.use { it.construct(root) }
+): Pair<ProvisioningService, SimpleVirtProvisioningService> {
+ val environment = environmentReader.use { it.construct(coroutineScope, clock) }
val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService]
// Wait for the bare metal nodes to be spawned
delay(10)
- val scheduler = SimpleVirtProvisioningService(simulationContext.domain, simulationContext.clock, bareMetalProvisioner, allocationPolicy)
+ val scheduler = SimpleVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy)
// Wait for the hypervisors to be spawned
delay(10)
- bareMetalProvisioner to scheduler
+ return bareMetalProvisioner to scheduler
}
/**
* Attach the specified monitor to the VM provisioner.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: ExperimentMonitor) {
- val domain = simulationContext.domain
- val clock = simulationContext.clock
+suspend fun attachMonitor(coroutineScope: CoroutineScope, clock: Clock, scheduler: SimpleVirtProvisioningService, monitor: ExperimentMonitor) {
val hypervisors = scheduler.drivers()
// Monitor hypervisor events
@@ -169,7 +164,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp
}
}
}
- .launchIn(domain)
+ .launchIn(coroutineScope)
hypervisor.events
.onEach { event ->
when (event) {
@@ -186,12 +181,12 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp
)
}
}
- .launchIn(domain)
+ .launchIn(coroutineScope)
val driver = hypervisor.server.services[BareMetalDriver.Key]
driver.powerDraw
.onEach { monitor.reportPowerConsumption(hypervisor.server, it) }
- .launchIn(domain)
+ .launchIn(coroutineScope)
}
scheduler.events
@@ -201,15 +196,13 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp
monitor.reportProvisionerMetrics(clock.millis(), event)
}
}
- .launchIn(domain)
+ .launchIn(coroutineScope)
}
/**
* Process the trace.
*/
-suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: ExperimentMonitor, vmPlacements: Map<String, String> = emptyMap()) {
- val domain = simulationContext.domain
-
+suspend fun processTrace(coroutineScope: CoroutineScope, clock: Clock, reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: ExperimentMonitor, vmPlacements: Map<String, String> = emptyMap()) {
try {
var submitted = 0
@@ -217,8 +210,8 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP
val (time, workload) = reader.next()
submitted++
- delay(max(0, time - simulationContext.clock.millis()))
- domain.launch {
+ delay(max(0, time - clock.millis()))
+ coroutineScope.launch {
chan.send(Unit)
val server = scheduler.deploy(
workload.image.name,
@@ -229,7 +222,7 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP
server.events
.onEach {
if (it is ServerEvent.StateChanged) {
- monitor.reportVmStateChange(simulationContext.clock.millis(), it.server)
+ monitor.reportVmStateChange(clock.millis(), it.server)
}
}
.collect()
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
index 7b42b095..76a10e56 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
@@ -24,7 +24,6 @@
package com.atlarge.opendc.experiments.sc20.experiment
-import com.atlarge.odcsim.SimulationEngineProvider
import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy
@@ -38,12 +37,14 @@ import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionC
import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
+import kotlinx.coroutines.test.TestCoroutineScope
import mu.KotlinLogging
+import org.opendc.simulator.utils.DelayControllerClockAdapter
import java.io.File
-import java.util.ServiceLoader
import kotlin.random.Random
/**
@@ -52,19 +53,15 @@ import kotlin.random.Random
private val logger = KotlinLogging.logger {}
/**
- * The provider for the simulation engine to use.
- */
-private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
-
-/**
* An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the
* same set of parameters.
*/
+@OptIn(ExperimentalCoroutinesApi::class)
public data class Run(override val parent: Scenario, val id: Int, val seed: Int) : TrialExperimentDescriptor() {
override suspend fun invoke(context: ExperimentExecutionContext) {
val experiment = parent.parent.parent
- val system = provider("experiment-$id")
- val root = system.newDomain("root")
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
val seeder = Random(seed)
val environment = Sc20ClusterEnvironmentReader(File(experiment.environments, "${parent.topology.name}.txt"))
@@ -112,9 +109,10 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int)
parent.parent.parent.bufferSize
)
- root.launch {
+ testScope.launch {
val (bareMetalProvisioner, scheduler) = createProvisioner(
- root,
+ this,
+ clock,
environment,
allocationPolicy
)
@@ -122,6 +120,8 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int)
val failureDomain = if (parent.operationalPhenomena.failureFrequency > 0) {
logger.debug("ENABLING failures")
createFailureDomain(
+ this,
+ clock,
seeder.nextInt(),
parent.operationalPhenomena.failureFrequency,
bareMetalProvisioner,
@@ -131,8 +131,10 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int)
null
}
- attachMonitor(scheduler, monitor)
+ attachMonitor(this, clock, scheduler, monitor)
processTrace(
+ this,
+ clock,
trace,
scheduler,
chan,
@@ -151,9 +153,8 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int)
}
try {
- system.run()
+ testScope.advanceUntilIdle()
} finally {
- system.terminate()
monitor.close()
}
}
diff --git a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
index 5ecf7605..ebee1543 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/test/kotlin/com/atlarge/opendc/experiments/sc20/Sc20IntegrationTest.kt
@@ -24,9 +24,6 @@
package com.atlarge.opendc.experiments.sc20
-import com.atlarge.odcsim.Domain
-import com.atlarge.odcsim.SimulationEngine
-import com.atlarge.odcsim.SimulationEngineProvider
import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.workload.VmWorkload
import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
@@ -42,32 +39,35 @@ import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import com.atlarge.opendc.format.trace.TraceReader
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
-import kotlinx.coroutines.runBlocking
+import kotlinx.coroutines.test.TestCoroutineScope
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertAll
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
+import org.opendc.simulator.utils.DelayControllerClockAdapter
import java.io.File
-import java.util.ServiceLoader
+import java.time.Clock
/**
* An integration test suite for the SC20 experiments.
*/
+@OptIn(ExperimentalCoroutinesApi::class)
class Sc20IntegrationTest {
/**
- * The simulation engine to use.
+ * The [TestCoroutineScope] to use.
*/
- private lateinit var simulationEngine: SimulationEngine
+ private lateinit var testScope: TestCoroutineScope
/**
- * The root simulation domain to run in.
+ * The simulation clock to use.
*/
- private lateinit var root: Domain
+ private lateinit var clock: Clock
/**
* The monitor used to keep track of the metrics.
@@ -79,9 +79,9 @@ class Sc20IntegrationTest {
*/
@BeforeEach
fun setUp() {
- val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
- simulationEngine = provider("test")
- root = simulationEngine.newDomain("root")
+ testScope = TestCoroutineScope()
+ clock = DelayControllerClockAdapter(testScope)
+
monitor = TestExperimentReporter()
}
@@ -89,9 +89,7 @@ class Sc20IntegrationTest {
* Tear down the experimental environment.
*/
@AfterEach
- fun tearDown() = runBlocking {
- simulationEngine.terminate()
- }
+ fun tearDown() = testScope.cleanupTestCoroutines()
@Test
fun smoke() {
@@ -103,9 +101,10 @@ class Sc20IntegrationTest {
val environmentReader = createTestEnvironmentReader()
lateinit var scheduler: SimpleVirtProvisioningService
- root.launch {
+ testScope.launch {
val res = createProvisioner(
- root,
+ this,
+ clock,
environmentReader,
allocationPolicy
)
@@ -115,6 +114,8 @@ class Sc20IntegrationTest {
val failureDomain = if (failures) {
println("ENABLING failures")
createFailureDomain(
+ this,
+ clock,
seed,
24.0 * 7,
bareMetalProvisioner,
@@ -124,8 +125,10 @@ class Sc20IntegrationTest {
null
}
- attachMonitor(scheduler, monitor)
+ attachMonitor(this, clock, scheduler, monitor)
processTrace(
+ this,
+ clock,
traceReader,
scheduler,
chan,
@@ -159,16 +162,19 @@ class Sc20IntegrationTest {
val environmentReader = createTestEnvironmentReader("single")
lateinit var scheduler: SimpleVirtProvisioningService
- root.launch {
+ testScope.launch {
val res = createProvisioner(
- root,
+ this,
+ clock,
environmentReader,
allocationPolicy
)
scheduler = res.second
- attachMonitor(scheduler, monitor)
+ attachMonitor(this, clock, scheduler, monitor)
processTrace(
+ this,
+ clock,
traceReader,
scheduler,
chan,
@@ -195,9 +201,7 @@ class Sc20IntegrationTest {
/**
* Run the simulation.
*/
- private fun runSimulation() = runBlocking {
- simulationEngine.run()
- }
+ private fun runSimulation() = testScope.advanceUntilIdle()
/**
* Obtain the trace reader for the test.