summaryrefslogtreecommitdiff
path: root/opendc/opendc-experiments-sc20/src/main
diff options
context:
space:
mode:
authorGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-03-26 13:55:32 +0100
committerGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-03-26 13:55:32 +0100
commit620f194c53d950a37f78577f4aacfd7c0c06bb9a (patch)
treef5f7ffdce8efdcffb92e158ebbb643ba1a797b23 /opendc/opendc-experiments-sc20/src/main
parentf4ee29bb97aed68329e72710dd3049c23f592f25 (diff)
parent7eb8177e2278bde2c0f4fad00af6fdd2d632cb5b (diff)
Merge branch 'feat/2.x-failures' into '2.x'
Implement basic hardware-level failures See merge request opendc/opendc-simulator!35
Diffstat (limited to 'opendc/opendc-experiments-sc20/src/main')
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt (renamed from opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20HypervisorMonitor.kt)23
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt84
2 files changed, 85 insertions, 22 deletions
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20HypervisorMonitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
index 9e8f0fa8..36da7703 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20HypervisorMonitor.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt
@@ -1,23 +1,32 @@
package com.atlarge.opendc.experiments.sc20
+import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Server
+import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.metal.driver.BareMetalDriver
-import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
import kotlinx.coroutines.flow.first
import java.io.BufferedWriter
import java.io.Closeable
import java.io.FileWriter
-class Sc20HypervisorMonitor(
+class Sc20Monitor(
destination: String
-) : HypervisorMonitor, Closeable {
+) : Closeable {
private val outputFile = BufferedWriter(FileWriter(destination))
+ private var failed: Int = 0
init {
- outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw\n")
+ outputFile.write("time,requestedBurst,grantedBurst,numberOfDeployedImages,server,hostUsage,powerDraw,failedVms\n")
}
- override suspend fun onSliceFinish(
+ suspend fun stateChanged(server: Server) {
+ println("[${simulationContext.clock.millis()}] ${server.uid} ${server.state}")
+ if (server.state == ServerState.ERROR) {
+ failed++
+ }
+ }
+
+ suspend fun onSliceFinish(
time: Long,
requestedBurst: Long,
grantedBurst: Long,
@@ -25,11 +34,11 @@ class Sc20HypervisorMonitor(
hostServer: Server
) {
// Assume for now that the host is not virtualized and measure the current power draw
- val driver = hostServer.serviceRegistry[BareMetalDriver.Key]
+ val driver = hostServer.services[BareMetalDriver.Key]
val usage = driver.usage.first()
val powerDraw = driver.powerDraw.first()
- outputFile.write("$time,$requestedBurst,$grantedBurst,$numberOfDeployedImages,${hostServer.uid},$usage,$powerDraw")
+ outputFile.write("$time,$requestedBurst,$grantedBurst,$numberOfDeployedImages,${hostServer.uid},$usage,$powerDraw,$failed")
outputFile.newLine()
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
index f0d3fc8d..639c3aef 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
@@ -24,15 +24,19 @@
package com.atlarge.opendc.experiments.sc20
+import com.atlarge.odcsim.Domain
import com.atlarge.odcsim.SimulationEngineProvider
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
-import com.atlarge.opendc.compute.core.Server
-import com.atlarge.opendc.compute.core.ServerState
-import com.atlarge.opendc.compute.core.monitor.ServerMonitor
+import com.atlarge.opendc.compute.core.ServerEvent
+import com.atlarge.opendc.compute.metal.NODE_CLUSTER
import com.atlarge.opendc.compute.metal.service.ProvisioningService
+import com.atlarge.opendc.compute.virt.HypervisorEvent
import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService
import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy
+import com.atlarge.opendc.core.failure.CorrelatedFaultInjector
+import com.atlarge.opendc.core.failure.FailureDomain
+import com.atlarge.opendc.core.failure.FaultInjector
import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader
import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader
import com.atlarge.opendc.format.trace.sc20.Sc20TraceReader
@@ -40,11 +44,15 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import com.xenomachina.argparser.ArgParser
import com.xenomachina.argparser.default
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.io.File
-import java.io.FileInputStream
import java.io.FileReader
import java.util.ServiceLoader
import kotlin.math.max
@@ -81,27 +89,35 @@ class ExperimentParameters(parser: ArgParser) {
}
/**
+ * Obtain the [FaultInjector] to use for the experiments.
+ */
+fun createFaultInjector(domain: Domain): FaultInjector {
+ // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
+ return CorrelatedFaultInjector(domain,
+ iatScale = -1.39, iatShape = 1.03,
+ sizeScale = 1.88, sizeShape = 1.25
+ )
+}
+
+/**
* Main entry point of the experiment.
*/
+@OptIn(ExperimentalCoroutinesApi::class)
fun main(args: Array<String>) {
ArgParser(args).parseInto(::ExperimentParameters).run {
- val hypervisorMonitor = Sc20HypervisorMonitor(outputFile)
- val monitor = object : ServerMonitor {
- override suspend fun onUpdate(server: Server, previousState: ServerState) {
- println(server)
- }
- }
+ val monitor = Sc20Monitor(outputFile)
val provider = ServiceLoader.load(SimulationEngineProvider::class.java).first()
val system = provider("test")
val root = system.newDomain("root")
+ val chan = Channel<Unit>(Channel.CONFLATED)
root.launch {
val environment = Sc20ClusterEnvironmentReader(File(environmentFile))
.use { it.construct(root) }
val performanceInterferenceStream = if (performanceInterferenceFile != null) {
- FileInputStream(File(performanceInterferenceFile!!))
+ File(performanceInterferenceFile!!).inputStream().buffered()
} else {
object {}.javaClass.getResourceAsStream("/env/performance-interference.json")
}
@@ -111,18 +127,56 @@ fun main(args: Array<String>) {
println(simulationContext.clock.instant())
+ val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key]
+
+ // Wait for the bare metal nodes to be spawned
+ delay(10)
+
val scheduler = SimpleVirtProvisioningService(
AvailableMemoryAllocationPolicy(),
simulationContext,
- environment.platforms[0].zones[0].services[ProvisioningService.Key],
- hypervisorMonitor
+ bareMetalProvisioner
)
+ // Wait for the hypervisors to be spawned
+ delay(10)
+
+ // Monitor hypervisor events
+ for (hypervisor in scheduler.drivers()) {
+ hypervisor.events
+ .onEach { event ->
+ when (event) {
+ is HypervisorEvent.SliceFinished -> monitor.onSliceFinish(simulationContext.clock.millis(), event.requestedBurst, event.grantedBurst, event.numberOfDeployedImages, event.hostServer)
+ else -> println(event)
+ }
+ }
+ .launchIn(this)
+ }
+
+ root.newDomain(name = "failures").launch {
+ chan.receive()
+ val injectors = mutableMapOf<String, FaultInjector>()
+
+ for (node in bareMetalProvisioner.nodes()) {
+ val cluster = node.metadata[NODE_CLUSTER] as String
+ val injector = injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain) }
+ injector.enqueue(node.metadata["driver"] as FailureDomain)
+ }
+ }
+
val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList())
while (reader.hasNext()) {
val (time, workload) = reader.next()
delay(max(0, time - simulationContext.clock.millis()))
- scheduler.deploy(workload.image, monitor, Flavor(workload.image.cores, workload.image.requiredMemory))
+ launch {
+ chan.send(Unit)
+ val server = scheduler.deploy(
+ workload.image.name, workload.image,
+ Flavor(workload.image.cores, workload.image.requiredMemory)
+ )
+ // Monitor server events
+ server.events.onEach { if (it is ServerEvent.StateChanged) monitor.stateChanged(it.server) }.collect()
+ }
}
println(simulationContext.clock.instant())
@@ -134,6 +188,6 @@ fun main(args: Array<String>) {
}
// Explicitly close the monitor to flush its buffer
- hypervisorMonitor.close()
+ monitor.close()
}
}