From 5ff443c799322836d532fffb3ff8f720806c32b6 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 16 Mar 2020 23:07:22 +0100 Subject: feat: Add failures to SC20 experiment --- .../kotlin/com/atlarge/opendc/compute/metal/Node.kt | 5 +++++ .../compute/metal/driver/SimpleBareMetalDriver.kt | 2 +- .../virt/service/SimpleVirtProvisioningService.kt | 13 ++++++------- .../core/failure/UncorrelatedFaultInjector.kt | 5 +++-- .../atlarge/opendc/experiments/sc20/Sc20Monitor.kt | 1 + .../opendc/experiments/sc20/TestExperiment.kt | 21 ++++++++++++++++++--- 6 files changed, 34 insertions(+), 13 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt index 7df5d99b..55948d3c 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/Node.kt @@ -43,6 +43,11 @@ data class Node( */ public override val name: String, + /** + * Meta data of the node. + */ + public val metadata: Map = emptyMap(), + /** * The last known state of the compute node. */ diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 4fe8d740..4b9a03a6 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -81,7 +81,7 @@ public class SimpleBareMetalDriver( /** * The machine state. */ - private var node: Node = Node(uid, name, NodeState.SHUTOFF, EmptyImage, null) + private var node: Node = Node(uid, name, mapOf("driver" to this), NodeState.SHUTOFF, EmptyImage, null) set(value) { if (field.state != value.state) { domain.launch { diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt index f0bb4e25..9fad2396 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt @@ -44,8 +44,8 @@ class SimpleVirtProvisioningService( init { ctx.domain.launch { - val provisionedNodes = provisioningService.nodes().toList() - val deployedNodes = provisionedNodes.map { node -> + val provisionedNodes = provisioningService.nodes() + provisionedNodes.forEach { node -> val hypervisorImage = HypervisorImage(hypervisorMonitor) val deployedNode = provisioningService.deploy(node, hypervisorImage, this@SimpleVirtProvisioningService) val server = deployedNode.server!! @@ -55,6 +55,7 @@ class SimpleVirtProvisioningService( 0, server.flavor.memorySize ) + hypervisors[server] = hvView yield() server.serviceRegistry[VirtDriver.Key].addMonitor(object : VirtDriverMonitor { override suspend fun onUpdate(numberOfActiveServers: Int, availableMemory: Long) { @@ -62,9 +63,7 @@ class SimpleVirtProvisioningService( hvView.availableMemory = availableMemory } }) - server to hvView } - hypervisors.putAll(deployedNodes) } } @@ -84,9 +83,10 @@ class SimpleVirtProvisioningService( val imagesToBeScheduled = incomingImages.toSet() for (imageInstance in imagesToBeScheduled) { - println("Spawning $imageInstance") val selectedNode = availableHypervisors.minWith(allocationPolicy().thenBy { it.server.uid }) ?: break try { + println("Spawning ${imageInstance.image}") + incomingImages -= imageInstance imageInstance.server = selectedNode.server.serviceRegistry[VirtDriver.Key].spawn( imageInstance.image, imageInstance.monitor, @@ -97,7 +97,6 @@ class SimpleVirtProvisioningService( println("Unable to deploy image due to insufficient memory") } - incomingImages -= imageInstance } } @@ -117,7 +116,7 @@ class SimpleVirtProvisioningService( } } - class ImageView( + data class ImageView( val image: Image, val monitor: ServerMonitor, val flavor: Flavor, diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt index 95127deb..5155a25a 100644 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/UncorrelatedFaultInjector.kt @@ -33,13 +33,14 @@ import kotlin.random.Random * A [FaultInjector] that injects uncorrelated faults into the system, meaning that failures of the subsystems are * independent. */ -public class UncorrelatedFaultInjector(private val mu: Double = 256.0, private val random: Random = Random.Default) : FaultInjector { +public class UncorrelatedFaultInjector(private val mu: Double = 1024.0, private val random: Random = Random.Default) : FaultInjector { /** * Enqueue the specified [FailureDomain] to fail some time in the future. */ override fun enqueue(domain: FailureDomain) { domain.scope.launch { - delay(random.expovariate(mu)) + val d = random.expovariate(mu) + delay(d) domain.fail() } } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt index 6ce9cefa..d3b2d5c6 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/Sc20Monitor.kt @@ -1,5 +1,6 @@ package com.atlarge.opendc.experiments.sc20 +import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.monitor.ServerMonitor diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index e47438f0..a1619fe2 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -30,6 +30,8 @@ import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.metal.service.ProvisioningService import com.atlarge.opendc.compute.virt.service.SimpleVirtProvisioningService import com.atlarge.opendc.compute.virt.service.allocation.AvailableMemoryAllocationPolicy +import com.atlarge.opendc.core.failure.FailureDomain +import com.atlarge.opendc.core.failure.UncorrelatedFaultInjector import com.atlarge.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import com.atlarge.opendc.format.trace.sc20.Sc20PerformanceInterferenceReader import com.atlarge.opendc.format.trace.sc20.Sc20TraceReader @@ -37,11 +39,11 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import com.xenomachina.argparser.ArgParser import com.xenomachina.argparser.default +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.io.File -import java.io.FileInputStream import java.io.FileReader import java.util.ServiceLoader import kotlin.math.max @@ -88,12 +90,14 @@ fun main(args: Array) { val system = provider("test") val root = system.newDomain("root") + val chan = Channel(Channel.CONFLATED) + root.launch { val environment = Sc20ClusterEnvironmentReader(File(environmentFile)) .use { it.construct(root) } val performanceInterferenceStream = if (performanceInterferenceFile != null) { - FileInputStream(File(performanceInterferenceFile!!)) + File(performanceInterferenceFile!!).inputStream().buffered() } else { object {}.javaClass.getResourceAsStream("/env/performance-interference.json") } @@ -103,17 +107,28 @@ fun main(args: Array) { println(simulationContext.clock.instant()) + val bareMetalProvisioner = environment.platforms[0].zones[0].services[ProvisioningService.Key] + val scheduler = SimpleVirtProvisioningService( AvailableMemoryAllocationPolicy(), simulationContext, - environment.platforms[0].zones[0].services[ProvisioningService.Key], + bareMetalProvisioner, monitor ) + root.launch { + chan.receive() + val faultInjector = UncorrelatedFaultInjector(mu = 2e7) + for (node in bareMetalProvisioner.nodes()) { + faultInjector.enqueue(node.metadata["driver"] as FailureDomain) + } + } + val reader = Sc20TraceReader(File(traceDirectory), performanceInterferenceModel, getSelectedVmList()) while (reader.hasNext()) { val (time, workload) = reader.next() delay(max(0, time - simulationContext.clock.millis())) + chan.send(Unit) scheduler.deploy(workload.image, monitor, Flavor(workload.image.cores, workload.image.requiredMemory)) } -- cgit v1.2.3