From c4016fcfd37550b237f6940eaffb5b4efd607601 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 3 Apr 2020 17:05:05 +0200 Subject: feat: Add initial prototype for failure recovery --- .../compute/metal/driver/SimpleBareMetalDriver.kt | 26 ++++++++++++++-------- .../atlarge/opendc/compute/virt/HypervisorImage.kt | 2 +- .../opendc/compute/virt/driver/SimpleVirtDriver.kt | 23 ++++++++++++------- .../opendc/core/failure/CorrelatedFaultInjector.kt | 24 ++++++++++++++++++-- .../atlarge/opendc/core/failure/FailureDomain.kt | 5 +++++ .../opendc/experiments/sc20/TestExperiment.kt | 23 +++++++++++++------ 6 files changed, 76 insertions(+), 27 deletions(-) (limited to 'opendc') diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 9396699a..cec1d1a7 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -49,7 +49,6 @@ import com.atlarge.opendc.core.services.ServiceRegistry import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job -import kotlinx.coroutines.cancel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.launch @@ -182,6 +181,10 @@ public class SimpleBareMetalDriver( private inner class BareMetalServerContext(val events: EventFlow) : ServerManagementContext { private var finalized: Boolean = false + // A state in which the machine is still available, but does not run any of the work requested by the + // image + var unavailable = false + override val cpus: List = this@SimpleBareMetalDriver.cpus override val server: Server @@ -266,7 +269,9 @@ public class SimpleBareMetalDriver( } } - usageState.value = totalUsage / cpus.size + if (!unavailable) { + usageState.value = totalUsage / cpus.size + } try { delay(duration) @@ -276,7 +281,7 @@ public class SimpleBareMetalDriver( } val end = simulationContext.clock.millis() - // Flush the load if the do not receive a new run call for the same timestamp + // Flush the load if they do not receive a new run call for the same timestamp flush = domain.launch(job) { delay(1) usageState.value = 0.0 @@ -285,6 +290,10 @@ public class SimpleBareMetalDriver( flush = null } + if (unavailable) { + return + } + // Write back the remaining burst time for (i in 0 until min(cpus.size, burst.size)) { val usage = min(limit[i], cpus[i].frequency) @@ -298,12 +307,11 @@ public class SimpleBareMetalDriver( get() = domain override suspend fun fail() { - try { - serverContext?.cancel(fail = true) - domain.cancel() - } catch (_: CancellationException) { - // Ignore if the machine has already failed. - } + // serverContext?.unavailable = true + } + + override suspend fun recover() { + // serverContext?.unavailable = false } override fun toString(): String = "SimpleBareMetalDriver(node = ${nodeState.value.uid})" diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt index c21b002d..bd395f0d 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt @@ -50,7 +50,7 @@ object HypervisorImage : Image { try { suspendCancellableCoroutine {} } finally { - driver.eventFlow.close() + driver.cancel() } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index b4626def..c21a9fc0 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -65,8 +65,8 @@ import kotlin.math.min @OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) class SimpleVirtDriver( private val hostContext: ServerContext, - private val coroutineScope: CoroutineScope -) : VirtDriver { + scope: CoroutineScope +) : VirtDriver, CoroutineScope by scope { /** * The [Server] on which this hypervisor runs. */ @@ -98,7 +98,7 @@ class SimpleVirtDriver( it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? performanceModel?.computeIntersectingItems(imagesRunning) } - }.launchIn(coroutineScope) + }.launchIn(this) } override suspend fun spawn( @@ -123,6 +123,10 @@ class SimpleVirtDriver( return server } + internal fun cancel() { + eventFlow.close() + } + /** * A flag to indicate the driver is stopped. */ @@ -141,7 +145,7 @@ class SimpleVirtDriver( /** * Schedule the vCPUs on the physical CPUs. */ - private suspend fun reschedule() { + private fun reschedule() { flush() // Do not schedule a call if there is no work to schedule or the driver stopped. @@ -149,7 +153,7 @@ class SimpleVirtDriver( return } - val call = coroutineScope.launch { + val call = launch { val start = simulationContext.clock.millis() val vms = activeVms.toSet() @@ -219,7 +223,7 @@ class SimpleVirtDriver( val fraction = req.allocatedUsage / totalUsage // Derive the burst that was allocated to this vCPU - val allocatedBurst = ceil(duration * req.allocatedUsage).toLong() + val allocatedBurst = ceil(totalBurst * fraction).toLong() // Compute the burst time that the VM was actually granted val grantedBurst = (performanceScore * (allocatedBurst - ceil(totalRemainder * fraction))).toLong() @@ -244,6 +248,9 @@ class SimpleVirtDriver( server ) ) + + // Make sure we reschedule the remaining amount of work (if we did not obtain the entire request) + reschedule() } this.call = call } @@ -286,7 +293,7 @@ class SimpleVirtDriver( var chan = Channel(Channel.RENDEZVOUS) private var initialized: Boolean = false - internal val job: Job = coroutineScope.launch { + internal val job: Job = launch { delay(1) // TODO Introduce boot time init() try { @@ -331,8 +338,8 @@ class SimpleVirtDriver( server = server.copy(state = serverState) availableMemory += server.flavor.memorySize vms.remove(this) - events.close() eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimpleVirtDriver, vms.size, availableMemory)) + events.close() } override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt index c5189764..f363bf45 100644 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt @@ -44,6 +44,8 @@ public class CorrelatedFaultInjector( private val iatShape: Double, private val sizeScale: Double, private val sizeShape: Double, + private val dScale: Double, + private val dShape: Double, random: Random = Random(0) ) : FaultInjector { /** @@ -84,7 +86,7 @@ public class CorrelatedFaultInjector( } job = this.domain.launch { - while (true) { + while (active.isNotEmpty()) { ensureActive() // Make sure to convert delay from hours to milliseconds @@ -98,10 +100,28 @@ public class CorrelatedFaultInjector( delay(d.toLong()) val n = lognvariate(sizeScale, sizeShape).toInt() - for (failureDomain in active.shuffled(random).take(n)) { + val targets = active.shuffled(random).take(n) + for (failureDomain in targets) { + active -= failureDomain failureDomain.fail() } + + val df = lognvariate(dScale, dShape) * 3600 * 1e6 + + // Handle long overflow + if (simulationContext.clock.millis() + df <= 0) { + return@launch + } + + delay(df.toLong()) + + for (failureDomain in targets) { + failureDomain.recover() + enqueue(failureDomain) + } } + + job = null } } diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FailureDomain.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FailureDomain.kt index 91ca9b83..d56df3c9 100644 --- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FailureDomain.kt +++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FailureDomain.kt @@ -39,4 +39,9 @@ public interface FailureDomain { * Fail the domain externally. */ public suspend fun fail() + + /** + * Resume the failure domain. + */ + public suspend fun recover() } diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt index cc403e6e..400cef33 100644 --- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt +++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt @@ -52,6 +52,7 @@ import com.fasterxml.jackson.module.kotlin.readValue import com.xenomachina.argparser.ArgParser import com.xenomachina.argparser.default import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect @@ -63,6 +64,7 @@ import java.io.File import java.io.FileReader import java.util.ServiceLoader import kotlin.math.max +import kotlin.random.Random class ExperimentParameters(parser: ArgParser) { val traceDirectory by parser.storing("path to the trace directory") @@ -100,11 +102,13 @@ class ExperimentParameters(parser: ArgParser) { /** * Obtain the [FaultInjector] to use for the experiments. */ -fun createFaultInjector(domain: Domain): FaultInjector { +fun createFaultInjector(domain: Domain, random: Random): FaultInjector { // Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009 return CorrelatedFaultInjector(domain, iatScale = -1.39, iatShape = 1.03, - sizeScale = 1.88, sizeShape = 1.25 + sizeScale = 1.88, sizeShape = 1.25, + dScale = 1.88, dShape = 1.25, + random = random ) } @@ -202,18 +206,22 @@ fun main(args: Array) { .launchIn(this) } - if (failures) { - println("ENABLE Failures") - root.newDomain(name = "failures").launch { + val failureDomain = if (failures) { + println("ENABLING failures") + val domain = root.newDomain(name = "failures") + domain.launch { chan.receive() + val random = Random(0) val injectors = mutableMapOf() - for (node in bareMetalProvisioner.nodes()) { val cluster = node.metadata[NODE_CLUSTER] as String - val injector = injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain) } + val injector = injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random) } injector.enqueue(node.metadata["driver"] as FailureDomain) } } + domain + } else { + null } val running = mutableSetOf() @@ -250,6 +258,7 @@ fun main(args: Array) { finish.receive() scheduler.terminate() + failureDomain?.cancel() println(simulationContext.clock.instant()) } -- cgit v1.2.3