summaryrefslogtreecommitdiff
path: root/simulator/opendc/opendc-experiments-sc20/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'simulator/opendc/opendc-experiments-sc20/src/main')
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt61
-rw-r--r--simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt29
2 files changed, 42 insertions, 48 deletions
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
index 3765f307..b68ee97e 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/ExperimentHelpers.kt
@@ -24,9 +24,6 @@
package com.atlarge.opendc.experiments.sc20.experiment
-import com.atlarge.odcsim.Domain
-import com.atlarge.odcsim.SimulationContext
-import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ServerEvent
import com.atlarge.opendc.compute.core.workload.PerformanceInterferenceModel
@@ -46,17 +43,15 @@ import com.atlarge.opendc.experiments.sc20.experiment.monitor.ExperimentMonitor
import com.atlarge.opendc.experiments.sc20.trace.Sc20StreamingParquetTraceReader
import com.atlarge.opendc.format.environment.EnvironmentReader
import com.atlarge.opendc.format.trace.TraceReader
-import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.takeWhile
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.withContext
import mu.KotlinLogging
import java.io.File
+import java.time.Clock
import kotlin.math.ln
import kotlin.math.max
import kotlin.random.Random
@@ -70,14 +65,14 @@ private val logger = KotlinLogging.logger {}
* Construct the failure domain for the experiments.
*/
suspend fun createFailureDomain(
+ coroutineScope: CoroutineScope,
+ clock: Clock,
seed: Int,
failureInterval: Double,
bareMetalProvisioner: ProvisioningService,
chan: Channel<Unit>
-): Domain {
- val root = simulationContext.domain
- val domain = root.newDomain(name = "failures")
- domain.launch {
+): CoroutineScope {
+ val job = coroutineScope.launch {
chan.receive()
val random = Random(seed)
val injectors = mutableMapOf<String, FaultInjector>()
@@ -86,7 +81,8 @@ suspend fun createFailureDomain(
val injector =
injectors.getOrPut(cluster) {
createFaultInjector(
- simulationContext,
+ this,
+ clock,
random,
failureInterval
)
@@ -94,18 +90,18 @@ suspend fun createFailureDomain(
injector.enqueue(node.metadata["driver"] as FailureDomain)
}
}
- return domain
+ return CoroutineScope(coroutineScope.coroutineContext + job)
}
/**
* Obtain the [FaultInjector] to use for the experiments.
*/
-fun createFaultInjector(simulationContext: SimulationContext, random: Random, failureInterval: Double): FaultInjector {
+fun createFaultInjector(coroutineScope: CoroutineScope, clock: Clock, random: Random, failureInterval: Double): FaultInjector {
// Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
// GRID'5000
return CorrelatedFaultInjector(
- simulationContext.domain,
- simulationContext.clock,
+ coroutineScope,
+ clock,
iatScale = ln(failureInterval), iatShape = 1.03, // Hours
sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1
dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes
@@ -129,31 +125,30 @@ fun createTraceReader(path: File, performanceInterferenceModel: PerformanceInter
* Construct the environment for a VM provisioner and return the provisioner instance.
*/
suspend fun createProvisioner(
- root: Domain,
+ coroutineScope: CoroutineScope,
+ clock: Clock,
environmentReader: EnvironmentReader,
allocationPolicy: AllocationPolicy
-): Pair<ProvisioningService, SimpleVirtProvisioningService> = withContext(root.coroutineContext) {
- val environment = environmentReader.use { it.construct(root) }
+): Pair<ProvisioningService, SimpleVirtProvisioningService> {
+ val environment = environmentReader.use { it.construct(coroutineScope, clock) }
val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService]
// Wait for the bare metal nodes to be spawned
delay(10)
- val scheduler = SimpleVirtProvisioningService(simulationContext.domain, simulationContext.clock, bareMetalProvisioner, allocationPolicy)
+ val scheduler = SimpleVirtProvisioningService(coroutineScope, clock, bareMetalProvisioner, allocationPolicy)
// Wait for the hypervisors to be spawned
delay(10)
- bareMetalProvisioner to scheduler
+ return bareMetalProvisioner to scheduler
}
/**
* Attach the specified monitor to the VM provisioner.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: ExperimentMonitor) {
- val domain = simulationContext.domain
- val clock = simulationContext.clock
+suspend fun attachMonitor(coroutineScope: CoroutineScope, clock: Clock, scheduler: SimpleVirtProvisioningService, monitor: ExperimentMonitor) {
val hypervisors = scheduler.drivers()
// Monitor hypervisor events
@@ -169,7 +164,7 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp
}
}
}
- .launchIn(domain)
+ .launchIn(coroutineScope)
hypervisor.events
.onEach { event ->
when (event) {
@@ -186,12 +181,12 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp
)
}
}
- .launchIn(domain)
+ .launchIn(coroutineScope)
val driver = hypervisor.server.services[BareMetalDriver.Key]
driver.powerDraw
.onEach { monitor.reportPowerConsumption(hypervisor.server, it) }
- .launchIn(domain)
+ .launchIn(coroutineScope)
}
scheduler.events
@@ -201,15 +196,13 @@ suspend fun attachMonitor(scheduler: SimpleVirtProvisioningService, monitor: Exp
monitor.reportProvisionerMetrics(clock.millis(), event)
}
}
- .launchIn(domain)
+ .launchIn(coroutineScope)
}
/**
* Process the trace.
*/
-suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: ExperimentMonitor, vmPlacements: Map<String, String> = emptyMap()) {
- val domain = simulationContext.domain
-
+suspend fun processTrace(coroutineScope: CoroutineScope, clock: Clock, reader: TraceReader<VmWorkload>, scheduler: SimpleVirtProvisioningService, chan: Channel<Unit>, monitor: ExperimentMonitor, vmPlacements: Map<String, String> = emptyMap()) {
try {
var submitted = 0
@@ -217,8 +210,8 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP
val (time, workload) = reader.next()
submitted++
- delay(max(0, time - simulationContext.clock.millis()))
- domain.launch {
+ delay(max(0, time - clock.millis()))
+ coroutineScope.launch {
chan.send(Unit)
val server = scheduler.deploy(
workload.image.name,
@@ -229,7 +222,7 @@ suspend fun processTrace(reader: TraceReader<VmWorkload>, scheduler: SimpleVirtP
server.events
.onEach {
if (it is ServerEvent.StateChanged) {
- monitor.reportVmStateChange(simulationContext.clock.millis(), it.server)
+ monitor.reportVmStateChange(clock.millis(), it.server)
}
}
.collect()
diff --git a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
index 7b42b095..76a10e56 100644
--- a/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
+++ b/simulator/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/experiment/Run.kt
@@ -24,7 +24,6 @@
package com.atlarge.opendc.experiments.sc20.experiment
-import com.atlarge.odcsim.SimulationEngineProvider
import com.atlarge.opendc.compute.virt.service.allocation.AvailableCoreMemoryAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy
import com.atlarge.opendc.compute.virt.service.allocation.NumberOfActiveServersAllocationPolicy
@@ -38,12 +37,14 @@ import com.atlarge.opendc.experiments.sc20.runner.execution.ExperimentExecutionC
import com.atlarge.opendc.experiments.sc20.trace.Sc20ParquetTraceReader
import com.atlarge.opendc.experiments.sc20.trace.Sc20RawParquetTraceReader
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
+import kotlinx.coroutines.test.TestCoroutineScope
import mu.KotlinLogging
+import org.opendc.simulator.utils.DelayControllerClockAdapter
import java.io.File
-import java.util.ServiceLoader
import kotlin.random.Random
/**
@@ -52,19 +53,15 @@ import kotlin.random.Random
private val logger = KotlinLogging.logger {}
/**
- * The provider for the simulation engine to use.
- */
-private val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
-
-/**
* An experiment run represent a single invocation of a trial and is used to distinguish between repetitions of the
* same set of parameters.
*/
+@OptIn(ExperimentalCoroutinesApi::class)
public data class Run(override val parent: Scenario, val id: Int, val seed: Int) : TrialExperimentDescriptor() {
override suspend fun invoke(context: ExperimentExecutionContext) {
val experiment = parent.parent.parent
- val system = provider("experiment-$id")
- val root = system.newDomain("root")
+ val testScope = TestCoroutineScope()
+ val clock = DelayControllerClockAdapter(testScope)
val seeder = Random(seed)
val environment = Sc20ClusterEnvironmentReader(File(experiment.environments, "${parent.topology.name}.txt"))
@@ -112,9 +109,10 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int)
parent.parent.parent.bufferSize
)
- root.launch {
+ testScope.launch {
val (bareMetalProvisioner, scheduler) = createProvisioner(
- root,
+ this,
+ clock,
environment,
allocationPolicy
)
@@ -122,6 +120,8 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int)
val failureDomain = if (parent.operationalPhenomena.failureFrequency > 0) {
logger.debug("ENABLING failures")
createFailureDomain(
+ this,
+ clock,
seeder.nextInt(),
parent.operationalPhenomena.failureFrequency,
bareMetalProvisioner,
@@ -131,8 +131,10 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int)
null
}
- attachMonitor(scheduler, monitor)
+ attachMonitor(this, clock, scheduler, monitor)
processTrace(
+ this,
+ clock,
trace,
scheduler,
chan,
@@ -151,9 +153,8 @@ public data class Run(override val parent: Scenario, val id: Int, val seed: Int)
}
try {
- system.run()
+ testScope.advanceUntilIdle()
} finally {
- system.terminate()
monitor.close()
}
}