diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-03-16 23:07:22 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-03-25 10:48:58 +0100 |
| commit | 5ff443c799322836d532fffb3ff8f720806c32b6 (patch) | |
| tree | 6844c2c1cd3b22ff3469d7c359cf47f558400ebd | |
| parent | a5d22796a95b187bc07cbd55a2289185bd9092b8 (diff) | |
feat: Add failures to SC20 experiment
6 files changed, 34 insertions, 13 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt index 7df5d99b..55948d3c 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt @@ -44,6 +44,11 @@ data class Node( public override val name: String, /** + * Meta data of the node. + */ + public val metadata: Map<String, Any> = emptyMap(), + + /** * The last known state of the compute node. */ public val state: NodeState, diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 4fe8d740..4b9a03a6 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -81,7 +81,7 @@ public class SimpleBareMetalDriver( /** * The machine state. */ - private var node: Node = Node(uid, name, NodeState.SHUTOFF, EmptyImage, null) + private var node: Node = Node(uid, name, mapOf("driver" to this), NodeState.SHUTOFF, EmptyImage, null) set(value) { if (field.state != value.state) { domain.launch { diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index f0bb4e25..9fad2396 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -44,8 +44,8 @@ class SimpleVirtProvisioningService( init { ctx.domain.launch { - val provisionedNodes = provisioningService.nodes().toList() - val deployedNodes = provisionedNodes.map { node -> + val provisionedNodes = provisioningService.nodes() + provisionedNodes.forEach { node -> val hypervisorImage = HypervisorImage(hypervisorMonitor) val deployedNode = provisioningService.deploy(node, hypervisorImage, this@SimpleVirtProvisioningService) val server = deployedNode.server!! @@ -55,6 +55,7 @@ class SimpleVirtProvisioningService( 0, server.flavor.memorySize ) + hypervisors[server] = hvView yield() server.serviceRegistry[VirtDriver.Key].addMonitor(object : VirtDriverMonitor { override suspend fun onUpdate(numberOfActiveServers: Int, availableMemory: Long) { @@ -62,9 +63,7 @@ class SimpleVirtProvisioningService( hvView.availableMemory = availableMemory } }) - server to hvView } - hypervisors.putAll(deployedNodes) } } @@ -84,9 +83,10 @@ class SimpleVirtProvisioningService( val imagesToBeScheduled = incomingImages.toSet() for (imageInstance in imagesToBeScheduled) { - println("Spawning $imageInstance") val selectedNode = availableHypervisors.minWith(allocationPolicy().thenBy { it.server.uid }) ?: break try { + println("Spawning ${imageInstance.image}") + incomingImages -= imageInstance imageInstance.server = selectedNode.server.serviceRegistry[VirtDriver.Key].spawn( imageInstance.image, imageInstance.monitor, @@ -97,7 +97,6 @@ class SimpleVirtProvisioningService( println("Unable to deploy image due to insufficient memory") } - incomingImages -= imageInstance } } @@ -117,7 +116,7 @@ class SimpleVirtProvisioningService( } } - class ImageView( + data class ImageView( val image: Image, val monitor: ServerMonitor, val flavor: Flavor, diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt index 95127deb..5155a25a 100644 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt @@ -33,13 +33,14 @@ import kotlin.random.Random * A [FaultInjector] that injects uncorrelated faults into the system, meaning that failures of the subsystems are * independent. */ -public class UncorrelatedFaultInjector(private val mu: Double = 256.0, private val random: Random = Random.Default) : FaultInjector { +public class UncorrelatedFaultInjector(private val mu: Double = 1024.0, private val random: Random = Random.Default) : FaultInjector { /** * Enqueue the specified [FailureDomain] to fail some time in the future. */ override fun enqueue(domain: FailureDomain) { domain.scope.launch { - delay(random.expovariate(mu)) + val d = random.expovariate(mu) + delay(d) domain.fail() } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index 6ce9cefa..d3b2d5c6 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -1,5 +1,6 @@ package com.atlarge.opendc.experiments.sc20 +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.monitor.ServerMonitor diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index e47438f0..a1619fe2 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -30,6 +30,8 @@ import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy +import com.atlarge.opendc.core.failure.FailureDomain +import com.atlarge.opendc.core.failure.UncorrelatedFaultInjector import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import com.atlarge.opendc.format.trace.sc20.Sc20TraceReader @@ -37,11 +39,11 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import com.xenomachina.argparser.ArgParser import com.xenomachina.argparser.default +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.io.File -import java.io.FileInputStream import java.io.FileReader import java.util.ServiceLoader import kotlin.math.max @@ -88,12 +90,14 @@ fun main(args: Array<String>) { val system = provider("test") val root = system.newDomain("root") + val chan = Channel<Unit>(Channel.CONFLATED) + root.launch { val environment = Sc20ClusterEnvironmentReader(File(environmentFile)) .use { it.construct(root) } val performanceInterferenceStream = if (performanceInterferenceFile != null) { - FileInputStream(File(performanceInterferenceFile!!)) + File(performanceInterferenceFile!!).inputStream().buffered() } else { object {}.javaClass.getResourceAsStream("/env/performance-interference.json") } @@ -103,17 +107,28 @@ fun main(args: Array<String>) { println(simulationContext.clock.instant()) + val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key] + val scheduler = SimpleVirtProvisioningService( AvailableMemoryAllocationPolicy(), simulationContext, - environment.platforms[0].zones[0].services[ProvisioningService.Key], + bareMetalProvisioner, monitor ) + root.launch { + chan.receive() + val faultInjector = UncorrelatedFaultInjector(mu = 2e7) + for (node in bareMetalProvisioner.nodes()) { + faultInjector.enqueue(node.metadata["driver"] as FailureDomain) + } + } + val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList()) while (reader.hasNext()) { val (time, workload) = reader.next() delay(max(0, time - simulationContext.clock.millis())) + chan.send(Unit) scheduler.deploy(workload.image, monitor, Flavor(workload.image.cores, workload.image.requiredMemory)) } |
