diff options
| author | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-03-26 13:55:32 +0100 |
|---|---|---|
| committer | Georgios Andreadis <g.andreadis@student.tudelft.nl> | 2020-03-26 13:55:32 +0100 |
| commit | 620f194c53d950a37f78577f4aacfd7c0c06bb9a (patch) | |
| tree | f5f7ffdce8efdcffb92e158ebbb643ba1a797b23 /opendc/opendc-experiments-sc20/src/main | |
| parent | f4ee29bb97aed68329e72710dd3049c23f592f25 (diff) | |
| parent | 7eb8177e2278bde2c0f4fad00af6fdd2d632cb5b (diff) | |
Merge branch 'feat/2.x-failures' into '2.x'
Implement basic hardware-level failures
See merge request opendc/opendc-simulator!35
Diffstat (limited to 'opendc/opendc-experiments-sc20/src/main')
| -rw-r--r-- | opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20HypervisorMonitor.kt) | 23 | ||||
| -rw-r--r-- | opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt | 84 |
2 files changed, 85 insertions, 22 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20HypervisorMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index 9e8f0fa8..36da7703 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20HypervisorMonitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -1,23 +1,32 @@ package com.atlarge.opendc.experiments.sc20 +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server +import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.metal.driver.BareMetalDriver -import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor import kotlinx.coroutines.flow.first import java.io.BufferedWriter import java.io.Closeable import java.io.FileWriter -class Sc20HypervisorMonitor( +class Sc20Monitor( destination: String -) : HypervisorMonitor, Closeable { +) : Closeable { private val outputFile = BufferedWriter(FileWriter(destination)) + private var failed: Int = 0 init { - outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw\n") + outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw,failedVms\n") } - override suspend fun onSliceFinish( + suspend fun stateChanged(server: Server) { + println("[${simulationContext.clock.millis()}] ${server.uid} ${server.state}") + if (server.state == ServerState.ERROR) { + failed++ + } + } + + suspend fun onSliceFinish( time: Long, requestedBurst: Long, grantedBurst: Long, @@ -25,11 +34,11 @@ class Sc20HypervisorMonitor( hostServer: Server ) { // Assume for now that the host is not virtualized and measure the current power draw - val driver = hostServer.serviceRegistry[BareMetalDriver.Key] + val driver = hostServer.services[BareMetalDriver.Key] val usage = driver.usage.first() val powerDraw = driver.powerDraw.first() - outputFile.write("$time,$requestedBurst,$grantedBurst,$numberOfDeployedImages,${hostServer.uid},$usage,$powerDraw") + outputFile.write("$time,$requestedBurst,$grantedBurst,$numberOfDeployedImages,${hostServer.uid},$usage,$powerDraw,$failed") outputFile.newLine() } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index f0d3fc8d..639c3aef 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -24,15 +24,19 @@ package com.atlarge.opendc.experiments.sc20 +import com.atlarge.odcsim.Domain import com.atlarge.odcsim.SimulationEngineProvider import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor -import com.atlarge.opendc.compute.core.Server -import com.atlarge.opendc.compute.core.ServerState -import com.atlarge.opendc.compute.core.monitor.ServerMonitor +import com.atlarge.opendc.compute.core.ServerEvent +import com.atlarge.opendc.compute.metal.NODE_CLUSTER import com.atlarge.opendc.compute.metal.service.ProvisioningService +import com.atlarge.opendc.compute.virt.HypervisorEvent import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy +import com.atlarge.opendc.core.failure.CorrelatedFaultInjector +import com.atlarge.opendc.core.failure.FailureDomain +import com.atlarge.opendc.core.failure.FaultInjector import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import com.atlarge.opendc.format.trace.sc20.Sc20TraceReader @@ -40,11 +44,15 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import com.xenomachina.argparser.ArgParser import com.xenomachina.argparser.default +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.io.File -import java.io.FileInputStream import java.io.FileReader import java.util.ServiceLoader import kotlin.math.max @@ -81,27 +89,35 @@ class ExperimentParameters(parser: ArgParser) { } /** + * Obtain the [FaultInjector] to use for the experiments. + */ +fun createFaultInjector(domain: Domain): FaultInjector { + // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 + return CorrelatedFaultInjector(domain, + iatScale = -1.39, iatShape = 1.03, + sizeScale = 1.88, sizeShape = 1.25 + ) +} + +/** * Main entry point of the experiment. */ +@OptIn(ExperimentalCoroutinesApi::class) fun main(args: Array<String>) { ArgParser(args).parseInto(::ExperimentParameters).run { - val hypervisorMonitor = Sc20HypervisorMonitor(outputFile) - val monitor = object : ServerMonitor { - override suspend fun onUpdate(server: Server, previousState: ServerState) { - println(server) - } - } + val monitor = Sc20Monitor(outputFile) val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first() val system = provider("test") val root = system.newDomain("root") + val chan = Channel<Unit>(Channel.CONFLATED) root.launch { val environment = Sc20ClusterEnvironmentReader(File(environmentFile)) .use { it.construct(root) } val performanceInterferenceStream = if (performanceInterferenceFile != null) { - FileInputStream(File(performanceInterferenceFile!!)) + File(performanceInterferenceFile!!).inputStream().buffered() } else { object {}.javaClass.getResourceAsStream("/env/performance-interference.json") } @@ -111,18 +127,56 @@ fun main(args: Array<String>) { println(simulationContext.clock.instant()) + val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key] + + // Wait for the bare metal nodes to be spawned + delay(10) + val scheduler = SimpleVirtProvisioningService( AvailableMemoryAllocationPolicy(), simulationContext, - environment.platforms[0].zones[0].services[ProvisioningService.Key], - hypervisorMonitor + bareMetalProvisioner ) + // Wait for the hypervisors to be spawned + delay(10) + + // Monitor hypervisor events + for (hypervisor in scheduler.drivers()) { + hypervisor.events + .onEach { event -> + when (event) { + is HypervisorEvent.SliceFinished -> monitor.onSliceFinish(simulationContext.clock.millis(), event.requestedBurst, event.grantedBurst, event.numberOfDeployedImages, event.hostServer) + else -> println(event) + } + } + .launchIn(this) + } + + root.newDomain(name = "failures").launch { + chan.receive() + val injectors = mutableMapOf<String, FaultInjector>() + + for (node in bareMetalProvisioner.nodes()) { + val cluster = node.metadata[NODE_CLUSTER] as String + val injector = injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain) } + injector.enqueue(node.metadata["driver"] as FailureDomain) + } + } + val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList()) while (reader.hasNext()) { val (time, workload) = reader.next() delay(max(0, time - simulationContext.clock.millis())) - scheduler.deploy(workload.image, monitor, Flavor(workload.image.cores, workload.image.requiredMemory)) + launch { + chan.send(Unit) + val server = scheduler.deploy( + workload.image.name, workload.image, + Flavor(workload.image.cores, workload.image.requiredMemory) + ) + // Monitor server events + server.events.onEach { if (it is ServerEvent.StateChanged) monitor.stateChanged(it.server) }.collect() + } } println(simulationContext.clock.instant()) @@ -134,6 +188,6 @@ fun main(args: Array<String>) { } // Explicitly close the monitor to flush its buffer - hypervisorMonitor.close() + monitor.close() } } |
