diff options
Diffstat (limited to 'opendc-experiments')
8 files changed, 65 insertions, 122 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index b2330af0..036d0638 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -35,7 +35,6 @@ dependencies { implementation(projects.opendcTrace.opendcTraceBitbrains) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) - implementation(projects.opendcSimulator.opendcSimulatorFailures) implementation(projects.opendcCompute.opendcComputeSimulator) implementation(projects.opendcTelemetry.opendcTelemetrySdk) implementation(projects.opendcTelemetry.opendcTelemetryCompute) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 0230409e..8227bca9 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -25,7 +25,8 @@ package org.opendc.experiments.capelin import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.SdkMeterProvider import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel +import org.apache.commons.math3.distribution.LogNormalDistribution +import org.apache.commons.math3.random.Well19937c import org.opendc.compute.api.* import org.opendc.compute.service.ComputeService import org.opendc.compute.service.scheduler.ComputeScheduler @@ -39,6 +40,9 @@ import org.opendc.compute.service.scheduler.weights.InstanceCountWeigher import org.opendc.compute.service.scheduler.weights.RamWeigher import org.opendc.compute.service.scheduler.weights.VCpuWeigher import org.opendc.compute.simulator.SimHost +import org.opendc.compute.simulator.failure.HostFaultInjector +import org.opendc.compute.simulator.failure.StartStopHostFault +import org.opendc.compute.simulator.failure.StochasticVictimSelector import org.opendc.experiments.capelin.env.EnvironmentReader import org.opendc.experiments.capelin.trace.TraceReader import org.opendc.simulator.compute.kernel.SimFairShareHypervisorProvider @@ -46,67 +50,36 @@ import org.opendc.simulator.compute.kernel.interference.VmInterferenceModel import org.opendc.simulator.compute.power.SimplePowerDriver import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.compute.workload.SimWorkload -import org.opendc.simulator.failures.CorrelatedFaultInjector -import org.opendc.simulator.failures.FaultInjector import org.opendc.simulator.resources.SimResourceInterpreter import org.opendc.telemetry.compute.ComputeMonitor import org.opendc.telemetry.sdk.toOtelClock import java.time.Clock -import kotlin.coroutines.resume +import kotlin.coroutines.CoroutineContext import kotlin.math.ln import kotlin.math.max import kotlin.random.Random /** - * Construct the failure domain for the experiments. - */ -fun createFailureDomain( - coroutineScope: CoroutineScope, - clock: Clock, - seed: Int, - failureInterval: Double, - service: ComputeService, - chan: Channel<Unit> -): CoroutineScope { - val job = coroutineScope.launch { - chan.receive() - val random = Random(seed) - val injectors = mutableMapOf<String, FaultInjector>() - for (host in service.hosts) { - val cluster = host.meta["cluster"] as String - val injector = - injectors.getOrPut(cluster) { - createFaultInjector( - this, - clock, - random, - failureInterval - ) - } - injector.enqueue(host as SimHost) - } - } - return CoroutineScope(coroutineScope.coroutineContext + job) -} - -/** * Obtain the [FaultInjector] to use for the experiments. */ fun createFaultInjector( - coroutineScope: CoroutineScope, + context: CoroutineContext, clock: Clock, - random: Random, + hosts: Set<SimHost>, + seed: Int, failureInterval: Double -): FaultInjector { +): HostFaultInjector { + val rng = Well19937c(seed) + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 // GRID'5000 - return CorrelatedFaultInjector( - coroutineScope, + return HostFaultInjector( + context, clock, - iatScale = ln(failureInterval), iatShape = 1.03, // Hours - sizeScale = ln(2.0), sizeShape = ln(1.0), // Expect 2 machines, with variation of 1 - dScale = ln(60.0), dShape = ln(60.0 * 8), // Minutes - random = random + hosts, + iat = LogNormalDistribution(rng, ln(failureInterval), 1.03), + selector = StochasticVictimSelector(LogNormalDistribution(rng, 1.88, 1.25), Random(seed)), + fault = StartStopHostFault(LogNormalDistribution(rng, 8.89, 2.71)) ) } @@ -162,14 +135,22 @@ suspend fun processTrace( clock: Clock, reader: TraceReader<SimWorkload>, scheduler: ComputeService, - chan: Channel<Unit>, monitor: ComputeMonitor? = null, ) { val client = scheduler.newClient() + val watcher = object : ServerWatcher { + override fun onStateChanged(server: Server, newState: ServerState) { + monitor?.onStateChange(clock.millis(), server, newState) + } + } + + // Create new image for the virtual machine val image = client.newImage("vm-image") - var offset = Long.MIN_VALUE + try { coroutineScope { + var offset = Long.MIN_VALUE + while (reader.hasNext()) { val entry = reader.next() @@ -180,9 +161,11 @@ suspend fun processTrace( // Make sure the trace entries are ordered by submission time assert(entry.start - offset >= 0) { "Invalid trace order" } delay(max(0, (entry.start - offset) - clock.millis())) + launch { - chan.send(Unit) - val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, offset = -offset + 300001) + val workloadOffset = -offset + 300001 + val workload = SimTraceWorkload((entry.meta["workload"] as SimTraceWorkload).trace, workloadOffset) + val server = client.newServer( entry.name, image, @@ -193,18 +176,14 @@ suspend fun processTrace( ), meta = entry.meta + mapOf("workload" to workload) ) + server.watch(watcher) - suspendCancellableCoroutine { cont -> - server.watch(object : ServerWatcher { - override fun onStateChanged(server: Server, newState: ServerState) { - monitor?.onStateChange(clock.millis(), server, newState) + // Wait for the server reach its end time + val endTime = entry.meta["end-time"] as Long + delay(endTime + workloadOffset - clock.millis() + 1) - if (newState == ServerState.TERMINATED) { - cont.resume(Unit) - } - } - }) - } + // Delete the server after reaching the end-time of the virtual machine + server.delete() } } } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 4db04591..82794471 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -25,9 +25,8 @@ package org.opendc.experiments.capelin import com.typesafe.config.ConfigFactory import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.cancel -import kotlinx.coroutines.channels.Channel import mu.KotlinLogging +import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.export.parquet.ParquetExportMonitor import org.opendc.experiments.capelin.model.CompositeWorkload @@ -103,7 +102,6 @@ abstract class Portfolio(name: String) : Experiment(name) { val seeder = Random(repeat.toLong()) val environment = ClusterEnvironmentReader(File(config.getString("env-path"), "${topology.name}.txt")) - val chan = Channel<Unit>(Channel.CONFLATED) val allocationPolicy = createComputeScheduler(allocationPolicy, seeder.asKotlinRandom(), vmPlacements) val meterProvider = createMeterProvider(clock) @@ -137,31 +135,30 @@ abstract class Portfolio(name: String) : Experiment(name) { ) withComputeService(clock, meterProvider, environment, allocationPolicy, performanceInterferenceModel) { scheduler -> - val failureDomain = if (operationalPhenomena.failureFrequency > 0) { + val faultInjector = if (operationalPhenomena.failureFrequency > 0) { logger.debug("ENABLING failures") - createFailureDomain( - this, + createFaultInjector( + coroutineContext, clock, + scheduler.hosts.map { it as SimHost }.toSet(), seeder.nextInt(), operationalPhenomena.failureFrequency, - scheduler, - chan ) } else { null } withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { + faultInjector?.start() processTrace( clock, trace, scheduler, - chan, monitor ) } - failureDomain?.cancel() + faultInjector?.close() monitor.close() } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index aed9a4bb..44cf92a8 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -23,8 +23,6 @@ package org.opendc.experiments.capelin import io.opentelemetry.sdk.metrics.export.MetricProducer -import kotlinx.coroutines.cancel -import kotlinx.coroutines.channels.Channel import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test @@ -34,6 +32,7 @@ import org.opendc.compute.service.scheduler.filters.ComputeFilter import org.opendc.compute.service.scheduler.filters.RamFilter import org.opendc.compute.service.scheduler.filters.VCpuFilter import org.opendc.compute.service.scheduler.weights.CoreRamWeigher +import org.opendc.compute.simulator.SimHost import org.opendc.experiments.capelin.env.ClusterEnvironmentReader import org.opendc.experiments.capelin.env.EnvironmentReader import org.opendc.experiments.capelin.model.Workload @@ -52,7 +51,7 @@ import java.io.File import java.util.* /** - * An integration test suite for the SC20 experiments. + * An integration test suite for the Capelin experiments. */ class CapelinIntegrationTest { /** @@ -73,9 +72,6 @@ class CapelinIntegrationTest { */ @Test fun testLarge() = runBlockingSimulation { - val failures = false - val seed = 0 - val chan = Channel<Unit>(Channel.CONFLATED) val allocationPolicy = FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), weighers = listOf(CoreRamWeigher(multiplier = 1.0)) @@ -85,31 +81,14 @@ class CapelinIntegrationTest { val meterProvider = createMeterProvider(clock) withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - val failureDomain = if (failures) { - println("ENABLING failures") - createFailureDomain( - this, - clock, - seed, - 24.0 * 7, - scheduler, - chan - ) - } else { - null - } - withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { processTrace( clock, traceReader, scheduler, - chan, monitor ) } - - failureDomain?.cancel() } val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) @@ -141,12 +120,11 @@ class CapelinIntegrationTest { @Test fun testSmall() = runBlockingSimulation { val seed = 1 - val chan = Channel<Unit>(Channel.CONFLATED) val allocationPolicy = FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), weighers = listOf(CoreRamWeigher(multiplier = 1.0)) ) - val traceReader = createTestTraceReader(0.5, seed) + val traceReader = createTestTraceReader(0.25, seed) val environmentReader = createTestEnvironmentReader("single") val meterProvider = createMeterProvider(clock) @@ -157,7 +135,6 @@ class CapelinIntegrationTest { clock, traceReader, scheduler, - chan, monitor ) } @@ -174,9 +151,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(38051879552, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(34888186408, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(971668973, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, + { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } }, + { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } }, + { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } } ) } @@ -187,7 +164,6 @@ class CapelinIntegrationTest { @Test fun testInterference() = runBlockingSimulation { val seed = 1 - val chan = Channel<Unit>(Channel.CONFLATED) val allocationPolicy = FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), weighers = listOf(CoreRamWeigher(multiplier = 1.0)) @@ -209,7 +185,6 @@ class CapelinIntegrationTest { clock, traceReader, scheduler, - chan, monitor ) } @@ -226,10 +201,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(38051879552, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(34888186408, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(971668973, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, - { assertEquals(13910814, monitor.totalInterferedWork) { "Total interfered work incorrect" } } + { assertEquals(39183961335, monitor.totalWork) { "Total requested work incorrect" } }, + { assertEquals(35649903197, monitor.totalGrantedWork) { "Total granted work incorrect" } }, + { assertEquals(1043641877, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, + { assertEquals(2960970230, monitor.totalInterferedWork) { "Total interfered work incorrect" } } ) } @@ -239,7 +214,6 @@ class CapelinIntegrationTest { @Test fun testFailures() = runBlockingSimulation { val seed = 1 - val chan = Channel<Unit>(Channel.CONFLATED) val allocationPolicy = FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), weighers = listOf(CoreRamWeigher(multiplier = 1.0)) @@ -250,27 +224,26 @@ class CapelinIntegrationTest { val meterProvider = createMeterProvider(clock) withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> - val failureDomain = - createFailureDomain( - this, + val faultInjector = + createFaultInjector( + coroutineContext, clock, + scheduler.hosts.map { it as SimHost }.toSet(), seed, 24.0 * 7, - scheduler, - chan ) withMonitor(scheduler, clock, meterProvider as MetricProducer, monitor) { + faultInjector.start() processTrace( clock, traceReader, scheduler, - chan, monitor ) } - failureDomain.cancel() + faultInjector.close() } val serviceMetrics = collectServiceMetrics(clock.millis(), meterProvider as MetricProducer) @@ -284,9 +257,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(25412073109, monitor.totalWork) { "Total requested work incorrect" } }, - { assertEquals(23695061858, monitor.totalGrantedWork) { "Total granted work incorrect" } }, - { assertEquals(368502468, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, + { assertEquals(38385852453, monitor.totalWork) { "Total requested work incorrect" } }, + { assertEquals(34886665781, monitor.totalGrantedWork) { "Total granted work incorrect" } }, + { assertEquals(979997253, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } } ) } diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt b/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt index 53b3c2d7..5642003d 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/resources/env/single.txt @@ -1,3 +1,3 @@ ClusterID;ClusterName;Cores;Speed;Memory;numberOfHosts;memoryCapacityPerHost;coreCountPerHost -A01;A01;8;3.2;64;1;64;8 +A01;A01;8;3.2;128;1;128;8 diff --git a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts b/opendc-experiments/opendc-experiments-energy21/build.gradle.kts index 40ac2967..cc58e5f1 100644 --- a/opendc-experiments/opendc-experiments-energy21/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-energy21/build.gradle.kts @@ -33,7 +33,6 @@ dependencies { api(projects.opendcHarness.opendcHarnessApi) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) - implementation(projects.opendcSimulator.opendcSimulatorFailures) implementation(projects.opendcCompute.opendcComputeSimulator) implementation(projects.opendcExperiments.opendcExperimentsCapelin) implementation(projects.opendcTelemetry.opendcTelemetrySdk) diff --git a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt index 02aaab3c..d9194969 100644 --- a/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt +++ b/opendc-experiments/opendc-experiments-energy21/src/main/kotlin/org/opendc/experiments/energy21/EnergyExperiment.kt @@ -26,7 +26,6 @@ import com.typesafe.config.ConfigFactory import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope import mu.KotlinLogging import org.opendc.compute.service.ComputeService @@ -81,7 +80,6 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") { private val powerModel by anyOf(PowerModelType.LINEAR, PowerModelType.CUBIC, PowerModelType.INTERPOLATION) override fun doRun(repeat: Int): Unit = runBlockingSimulation { - val chan = Channel<Unit>(Channel.CONFLATED) val allocationPolicy = FilterScheduler( filters = listOf(ComputeFilter(), VCpuFilter(1.0), RamFilter(1.0)), weighers = listOf(), @@ -98,7 +96,6 @@ public class EnergyExperiment : Experiment("Energy Modeling 2021") { clock, trace, scheduler, - chan, monitor ) } diff --git a/opendc-experiments/opendc-experiments-radice/build.gradle.kts b/opendc-experiments/opendc-experiments-radice/build.gradle.kts index c1515165..0c716183 100644 --- a/opendc-experiments/opendc-experiments-radice/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-radice/build.gradle.kts @@ -34,7 +34,6 @@ dependencies { implementation(projects.opendcFormat) implementation(projects.opendcSimulator.opendcSimulatorCore) implementation(projects.opendcSimulator.opendcSimulatorCompute) - implementation(projects.opendcSimulator.opendcSimulatorFailures) implementation(projects.opendcCompute.opendcComputeSimulator) implementation(projects.opendcTelemetry.opendcTelemetrySdk) |
