summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-capelin/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt97
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt17
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt65
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt2
4 files changed, 65 insertions, 116 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 0230409e..8227bca9 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -25,7 +25,8 @@ package org.opendc.experiments.capelin
import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.sdk.metrics.SdkMeterProvider
import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.Channel
+import org.apache.commons.math3.distribution.LogNormalDistribution
+import org.apache.commons.math3.random.Well19937c
import org.opendc.compute.api.*
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.scheduler.ComputeScheduler
@@ -39,6 +40,9 @@ import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher
import org.opendc.compute.service.scheduler.weights.RamWeigher
import org.opendc.compute.service.scheduler.weights.VCpuWeigher
import org.opendc.compute.simulator.SimHost
+import org.opendc.compute.simulator.failure.HostFaultInjector
+import org.opendc.compute.simulator.failure.StartStopHostFault
+import org.opendc.compute.simulator.failure.StochasticVictimSelector
import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.trace.TraceReader
import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider
@@ -46,67 +50,36 @@ import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel
import org.opendc.simulator.compute.power.SimplePowerDriver
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.compute.workload.SimWorkload
-import org.opendc.simulator.failures.CorrelatedFaultInjector
-import org.opendc.simulator.failures.FaultInjector
import org.opendc.simulator.resources.SimResourceInterpreter
import org.opendc.telemetry.compute.ComputeMonitor
import org.opendc.telemetry.sdk.toOtelClock
import java.time.Clock
-import kotlin.coroutines.resume
+import kotlin.coroutines.CoroutineContext
import kotlin.math.ln
import kotlin.math.max
import kotlin.random.Random
/**
- * Construct the failure domain for the experiments.
- */
-fun createFailureDomain(
- coroutineScope: CoroutineScope,
- clock: Clock,
- seed: Int,
- failureInterval: Double,
- service: ComputeService,
- chan: Channel<Unit>
-): CoroutineScope {
- val job = coroutineScope.launch {
- chan.receive()
- val random = Random(seed)
- val injectors = mutableMapOf<String, FaultInjector>()
- for (host in service.hosts) {
- val cluster = host.meta["cluster"] as String
- val injector =
- injectors.getOrPut(cluster) {
- createFaultInjector(
- this,
- clock,
- random,
- failureInterval
- )
- }
- injector.enqueue(host as SimHost)
- }
- }
- return CoroutineScope(coroutineScope.coroutineContext + job)
-}
-
-/**
* Obtain the [FaultInjector] to use for the experiments.
*/
fun createFaultInjector(
- coroutineScope: CoroutineScope,
+ context: CoroutineContext,
clock: Clock,
- random: Random,
+ hosts: Set<SimHost>,
+ seed: Int,
failureInterval: Double
-): FaultInjector {
+): HostFaultInjector {
+ val rng = Well19937c(seed)
+
// Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
// GRID'5000
- return CorrelatedFaultInjector(
- coroutineScope,
+ return HostFaultInjector(
+ context,
clock,
- iatScale = ln(failureInterval), iatShape = 1.03, // Hours
- sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1
- dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes
- random = random
+ hosts,
+ iat = LogNormalDistribution(rng, ln(failureInterval), 1.03),
+ selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)),
+ fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71))
)
}
@@ -162,14 +135,22 @@ suspend fun processTrace(
clock: Clock,
reader: TraceReader<SimWorkload>,
scheduler: ComputeService,
- chan: Channel<Unit>,
monitor: ComputeMonitor? = null,
) {
val client = scheduler.newClient()
+ val watcher = object : ServerWatcher {
+ override fun onStateChanged(server: Server, newState: ServerState) {
+ monitor?.onStateChange(clock.millis(), server, newState)
+ }
+ }
+
+ // Create new image for the virtual machine
val image = client.newImage("vm-image")
- var offset = Long.MIN_VALUE
+
try {
coroutineScope {
+ var offset = Long.MIN_VALUE
+
while (reader.hasNext()) {
val entry = reader.next()
@@ -180,9 +161,11 @@ suspend fun processTrace(
// Make sure the trace entries are ordered by submission time
assert(entry.start - offset >= 0) { "Invalid trace order" }
delay(max(0, (entry.start - offset) - clock.millis()))
+
launch {
- chan.send(Unit)
- val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, offset = -offset + 300001)
+ val workloadOffset = -offset + 300001
+ val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset)
+
val server = client.newServer(
entry.name,
image,
@@ -193,18 +176,14 @@ suspend fun processTrace(
),
meta = entry.meta + mapOf("workload" to workload)
)
+ server.watch(watcher)
- suspendCancellableCoroutine { cont ->
- server.watch(object : ServerWatcher {
- override fun onStateChanged(server: Server, newState: ServerState) {
- monitor?.onStateChange(clock.millis(), server, newState)
+ // Wait for the server reach its end time
+ val endTime = entry.meta["end-time"] as Long
+ delay(endTime + workloadOffset - clock.millis() + 1)
- if (newState == ServerState.TERMINATED) {
- cont.resume(Unit)
- }
- }
- })
- }
+ // Delete the server after reaching the end-time of the virtual machine
+ server.delete()
}
}
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
index 4db04591..82794471 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt
@@ -25,9 +25,8 @@ package org.opendc.experiments.capelin
import com.typesafe.config.ConfigFactory
import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.cancel
-import kotlinx.coroutines.channels.Channel
import mu.KotlinLogging
+import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor
import org.opendc.experiments.capelin.model.CompositeWorkload
@@ -103,7 +102,6 @@ abstract class Portfolio(name: String) : Experiment(name) {
val seeder = Random(repeat.toLong())
val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt"))
- val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = createComputeScheduler(allocationPolicy, seeder.asKotlinRandom(), vmPlacements)
val meterProvider = createMeterProvider(clock)
@@ -137,31 +135,30 @@ abstract class Portfolio(name: String) : Experiment(name) {
)
withComputeService(clock, meterProvider, environment, allocationPolicy, performanceInterferenceModel) { scheduler ->
- val failureDomain = if (operationalPhenomena.failureFrequency > 0) {
+ val faultInjector = if (operationalPhenomena.failureFrequency > 0) {
logger.debug("ENABLING failures")
- createFailureDomain(
- this,
+ createFaultInjector(
+ coroutineContext,
clock,
+ scheduler.hosts.map { it as SimHost }.toSet(),
seeder.nextInt(),
operationalPhenomena.failureFrequency,
- scheduler,
- chan
)
} else {
null
}
withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
+ faultInjector?.start()
processTrace(
clock,
trace,
scheduler,
- chan,
monitor
)
}
- failureDomain?.cancel()
+ faultInjector?.close()
monitor.close()
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
index aed9a4bb..44cf92a8 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt
@@ -23,8 +23,6 @@
package org.opendc.experiments.capelin
import io.opentelemetry.sdk.metrics.export.MetricProducer
-import kotlinx.coroutines.cancel
-import kotlinx.coroutines.channels.Channel
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
@@ -34,6 +32,7 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter
import org.opendc.compute.service.scheduler.filters.RamFilter
import org.opendc.compute.service.scheduler.filters.VCpuFilter
import org.opendc.compute.service.scheduler.weights.CoreRamWeigher
+import org.opendc.compute.simulator.SimHost
import org.opendc.experiments.capelin.env.ClusterEnvironmentReader
import org.opendc.experiments.capelin.env.EnvironmentReader
import org.opendc.experiments.capelin.model.Workload
@@ -52,7 +51,7 @@ import java.io.File
import java.util.*
/**
- * An integration test suite for the SC20 experiments.
+ * An integration test suite for the Capelin experiments.
*/
class CapelinIntegrationTest {
/**
@@ -73,9 +72,6 @@ class CapelinIntegrationTest {
*/
@Test
fun testLarge() = runBlockingSimulation {
- val failures = false
- val seed = 0
- val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
weighers = listOf(CoreRamWeigher(multiplier = 1.0))
@@ -85,31 +81,14 @@ class CapelinIntegrationTest {
val meterProvider = createMeterProvider(clock)
withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- val failureDomain = if (failures) {
- println("ENABLING failures")
- createFailureDomain(
- this,
- clock,
- seed,
- 24.0 * 7,
- scheduler,
- chan
- )
- } else {
- null
- }
-
withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
processTrace(
clock,
traceReader,
scheduler,
- chan,
monitor
)
}
-
- failureDomain?.cancel()
}
val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
@@ -141,12 +120,11 @@ class CapelinIntegrationTest {
@Test
fun testSmall() = runBlockingSimulation {
val seed = 1
- val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
weighers = listOf(CoreRamWeigher(multiplier = 1.0))
)
- val traceReader = createTestTraceReader(0.5, seed)
+ val traceReader = createTestTraceReader(0.25, seed)
val environmentReader = createTestEnvironmentReader("single")
val meterProvider = createMeterProvider(clock)
@@ -157,7 +135,6 @@ class CapelinIntegrationTest {
clock,
traceReader,
scheduler,
- chan,
monitor
)
}
@@ -174,9 +151,9 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(38051879552, monitor.totalWork) { "Total requested work incorrect" } },
- { assertEquals(34888186408, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(971668973, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
+ { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } },
+ { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } },
+ { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
{ assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
)
}
@@ -187,7 +164,6 @@ class CapelinIntegrationTest {
@Test
fun testInterference() = runBlockingSimulation {
val seed = 1
- val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
weighers = listOf(CoreRamWeigher(multiplier = 1.0))
@@ -209,7 +185,6 @@ class CapelinIntegrationTest {
clock,
traceReader,
scheduler,
- chan,
monitor
)
}
@@ -226,10 +201,10 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(38051879552, monitor.totalWork) { "Total requested work incorrect" } },
- { assertEquals(34888186408, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(971668973, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
- { assertEquals(13910814, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
+ { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } },
+ { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } },
+ { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
+ { assertEquals(2960970230, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
)
}
@@ -239,7 +214,6 @@ class CapelinIntegrationTest {
@Test
fun testFailures() = runBlockingSimulation {
val seed = 1
- val chan = Channel<Unit>(Channel.CONFLATED)
val allocationPolicy = FilterScheduler(
filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)),
weighers = listOf(CoreRamWeigher(multiplier = 1.0))
@@ -250,27 +224,26 @@ class CapelinIntegrationTest {
val meterProvider = createMeterProvider(clock)
withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler ->
- val failureDomain =
- createFailureDomain(
- this,
+ val faultInjector =
+ createFaultInjector(
+ coroutineContext,
clock,
+ scheduler.hosts.map { it as SimHost }.toSet(),
seed,
24.0 * 7,
- scheduler,
- chan
)
withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) {
+ faultInjector.start()
processTrace(
clock,
traceReader,
scheduler,
- chan,
monitor
)
}
- failureDomain.cancel()
+ faultInjector.close()
}
val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer)
@@ -284,9 +257,9 @@ class CapelinIntegrationTest {
// Note that these values have been verified beforehand
assertAll(
- { assertEquals(25412073109, monitor.totalWork) { "Total requested work incorrect" } },
- { assertEquals(23695061858, monitor.totalGrantedWork) { "Total granted work incorrect" } },
- { assertEquals(368502468, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
+ { assertEquals(38385852453, monitor.totalWork) { "Total requested work incorrect" } },
+ { assertEquals(34886665781, monitor.totalGrantedWork) { "Total granted work incorrect" } },
+ { assertEquals(979997253, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } },
{ assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } }
)
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt b/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt
index 53b3c2d7..5642003d 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt
+++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt
@@ -1,3 +1,3 @@
ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost
-A01;A01;8;3.2;64;1;64;8
+A01;A01;8;3.2;128;1;128;8