summaryrefslogtreecommitdiff
path: root/opendc
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-04-03 17:05:05 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-04-03 17:05:05 +0200
commitc4016fcfd37550b237f6940eaffb5b4efd607601 (patch)
tree1abe286499e20da066bdf9c1dca778fe78ce6017 /opendc
parenta625066b997cfeeb31c88dddeb17fc67ea75d6e6 (diff)
feat: Add initial prototype for failure recovery
Diffstat (limited to 'opendc')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt26
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt2
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt23
-rw-r--r--opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt24
-rw-r--r--opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FailureDomain.kt5
-rw-r--r--opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt23
6 files changed, 76 insertions, 27 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index 9396699a..cec1d1a7 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -49,7 +49,6 @@ import com.atlarge.opendc.core.services.ServiceRegistry
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
-import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.launch
@@ -182,6 +181,10 @@ public class SimpleBareMetalDriver(
private inner class BareMetalServerContext(val events: EventFlow<ServerEvent>) : ServerManagementContext {
private var finalized: Boolean = false
+ // A state in which the machine is still available, but does not run any of the work requested by the
+ // image
+ var unavailable = false
+
override val cpus: List<ProcessingUnit> = this@SimpleBareMetalDriver.cpus
override val server: Server
@@ -266,7 +269,9 @@ public class SimpleBareMetalDriver(
}
}
- usageState.value = totalUsage / cpus.size
+ if (!unavailable) {
+ usageState.value = totalUsage / cpus.size
+ }
try {
delay(duration)
@@ -276,7 +281,7 @@ public class SimpleBareMetalDriver(
}
val end = simulationContext.clock.millis()
- // Flush the load if the do not receive a new run call for the same timestamp
+ // Flush the load if they do not receive a new run call for the same timestamp
flush = domain.launch(job) {
delay(1)
usageState.value = 0.0
@@ -285,6 +290,10 @@ public class SimpleBareMetalDriver(
flush = null
}
+ if (unavailable) {
+ return
+ }
+
// Write back the remaining burst time
for (i in 0 until min(cpus.size, burst.size)) {
val usage = min(limit[i], cpus[i].frequency)
@@ -298,12 +307,11 @@ public class SimpleBareMetalDriver(
get() = domain
override suspend fun fail() {
- try {
- serverContext?.cancel(fail = true)
- domain.cancel()
- } catch (_: CancellationException) {
- // Ignore if the machine has already failed.
- }
+ // serverContext?.unavailable = true
+ }
+
+ override suspend fun recover() {
+ // serverContext?.unavailable = false
}
override fun toString(): String = "SimpleBareMetalDriver(node = ${nodeState.value.uid})"
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
index c21b002d..bd395f0d 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
@@ -50,7 +50,7 @@ object HypervisorImage : Image {
try {
suspendCancellableCoroutine<Unit> {}
} finally {
- driver.eventFlow.close()
+ driver.cancel()
}
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index b4626def..c21a9fc0 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -65,8 +65,8 @@ import kotlin.math.min
@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
class SimpleVirtDriver(
private val hostContext: ServerContext,
- private val coroutineScope: CoroutineScope
-) : VirtDriver {
+ scope: CoroutineScope
+) : VirtDriver, CoroutineScope by scope {
/**
* The [Server] on which this hypervisor runs.
*/
@@ -98,7 +98,7 @@ class SimpleVirtDriver(
it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
performanceModel?.computeIntersectingItems(imagesRunning)
}
- }.launchIn(coroutineScope)
+ }.launchIn(this)
}
override suspend fun spawn(
@@ -123,6 +123,10 @@ class SimpleVirtDriver(
return server
}
+ internal fun cancel() {
+ eventFlow.close()
+ }
+
/**
* A flag to indicate the driver is stopped.
*/
@@ -141,7 +145,7 @@ class SimpleVirtDriver(
/**
* Schedule the vCPUs on the physical CPUs.
*/
- private suspend fun reschedule() {
+ private fun reschedule() {
flush()
// Do not schedule a call if there is no work to schedule or the driver stopped.
@@ -149,7 +153,7 @@ class SimpleVirtDriver(
return
}
- val call = coroutineScope.launch {
+ val call = launch {
val start = simulationContext.clock.millis()
val vms = activeVms.toSet()
@@ -219,7 +223,7 @@ class SimpleVirtDriver(
val fraction = req.allocatedUsage / totalUsage
// Derive the burst that was allocated to this vCPU
- val allocatedBurst = ceil(duration * req.allocatedUsage).toLong()
+ val allocatedBurst = ceil(totalBurst * fraction).toLong()
// Compute the burst time that the VM was actually granted
val grantedBurst = (performanceScore * (allocatedBurst - ceil(totalRemainder * fraction))).toLong()
@@ -244,6 +248,9 @@ class SimpleVirtDriver(
server
)
)
+
+ // Make sure we reschedule the remaining amount of work (if we did not obtain the entire request)
+ reschedule()
}
this.call = call
}
@@ -286,7 +293,7 @@ class SimpleVirtDriver(
var chan = Channel<Unit>(Channel.RENDEZVOUS)
private var initialized: Boolean = false
- internal val job: Job = coroutineScope.launch {
+ internal val job: Job = launch {
delay(1) // TODO Introduce boot time
init()
try {
@@ -331,8 +338,8 @@ class SimpleVirtDriver(
server = server.copy(state = serverState)
availableMemory += server.flavor.memorySize
vms.remove(this)
- events.close()
eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimpleVirtDriver, vms.size, availableMemory))
+ events.close()
}
override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) {
diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt
index c5189764..f363bf45 100644
--- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt
+++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/CorrelatedFaultInjector.kt
@@ -44,6 +44,8 @@ public class CorrelatedFaultInjector(
private val iatShape: Double,
private val sizeScale: Double,
private val sizeShape: Double,
+ private val dScale: Double,
+ private val dShape: Double,
random: Random = Random(0)
) : FaultInjector {
/**
@@ -84,7 +86,7 @@ public class CorrelatedFaultInjector(
}
job = this.domain.launch {
- while (true) {
+ while (active.isNotEmpty()) {
ensureActive()
// Make sure to convert delay from hours to milliseconds
@@ -98,10 +100,28 @@ public class CorrelatedFaultInjector(
delay(d.toLong())
val n = lognvariate(sizeScale, sizeShape).toInt()
- for (failureDomain in active.shuffled(random).take(n)) {
+ val targets = active.shuffled(random).take(n)
+ for (failureDomain in targets) {
+ active -= failureDomain
failureDomain.fail()
}
+
+ val df = lognvariate(dScale, dShape) * 3600 * 1e6
+
+ // Handle long overflow
+ if (simulationContext.clock.millis() + df <= 0) {
+ return@launch
+ }
+
+ delay(df.toLong())
+
+ for (failureDomain in targets) {
+ failureDomain.recover()
+ enqueue(failureDomain)
+ }
}
+
+ job = null
}
}
diff --git a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FailureDomain.kt b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FailureDomain.kt
index 91ca9b83..d56df3c9 100644
--- a/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FailureDomain.kt
+++ b/opendc/opendc-core/src/main/kotlin/com/atlarge/opendc/core/failure/FailureDomain.kt
@@ -39,4 +39,9 @@ public interface FailureDomain {
* Fail the domain externally.
*/
public suspend fun fail()
+
+ /**
+ * Resume the failure domain.
+ */
+ public suspend fun recover()
}
diff --git a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
index cc403e6e..400cef33 100644
--- a/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
+++ b/opendc/opendc-experiments-sc20/src/main/kotlin/com/atlarge/opendc/experiments/sc20/TestExperiment.kt
@@ -52,6 +52,7 @@ import com.fasterxml.jackson.module.kotlin.readValue
import com.xenomachina.argparser.ArgParser
import com.xenomachina.argparser.default
import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
@@ -63,6 +64,7 @@ import java.io.File
import java.io.FileReader
import java.util.ServiceLoader
import kotlin.math.max
+import kotlin.random.Random
class ExperimentParameters(parser: ArgParser) {
val traceDirectory by parser.storing("path to the trace directory")
@@ -100,11 +102,13 @@ class ExperimentParameters(parser: ArgParser) {
/**
* Obtain the [FaultInjector] to use for the experiments.
*/
-fun createFaultInjector(domain: Domain): FaultInjector {
+fun createFaultInjector(domain: Domain, random: Random): FaultInjector {
// Parameters from A. Iosup, A Framework for the Study of Grid Inter-Operation Mechanisms, 2009
return CorrelatedFaultInjector(domain,
iatScale = -1.39, iatShape = 1.03,
- sizeScale = 1.88, sizeShape = 1.25
+ sizeScale = 1.88, sizeShape = 1.25,
+ dScale = 1.88, dShape = 1.25,
+ random = random
)
}
@@ -202,18 +206,22 @@ fun main(args: Array<String>) {
.launchIn(this)
}
- if (failures) {
- println("ENABLE Failures")
- root.newDomain(name = "failures").launch {
+ val failureDomain = if (failures) {
+ println("ENABLING failures")
+ val domain = root.newDomain(name = "failures")
+ domain.launch {
chan.receive()
+ val random = Random(0)
val injectors = mutableMapOf<String, FaultInjector>()
-
for (node in bareMetalProvisioner.nodes()) {
val cluster = node.metadata[NODE_CLUSTER] as String
- val injector = injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain) }
+ val injector = injectors.getOrPut(cluster) { createFaultInjector(simulationContext.domain, random) }
injector.enqueue(node.metadata["driver"] as FailureDomain)
}
}
+ domain
+ } else {
+ null
}
val running = mutableSetOf<Server>()
@@ -250,6 +258,7 @@ fun main(args: Array<String>) {
finish.receive()
scheduler.terminate()
+ failureDomain?.cancel()
println(simulationContext.clock.instant())
}