diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-04-03 17:05:05 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-04-03 17:05:05 +0200 |
| commit | c4016fcfd37550b237f6940eaffb5b4efd607601 (patch) | |
| tree | 1abe286499e20da066bdf9c1dca778fe78ce6017 /opendc/opendc-compute/src | |
| parent | a625066b997cfeeb31c88dddeb17fc67ea75d6e6 (diff) | |
feat: Add initial prototype for failure recovery
Diffstat (limited to 'opendc/opendc-compute/src')
3 files changed, 33 insertions, 18 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 9396699a..cec1d1a7 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -49,7 +49,6 @@ import com.atlarge.opendc.core.services.ServiceRegistry import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job -import kotlinx.coroutines.cancel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.launch @@ -182,6 +181,10 @@ public class SimpleBareMetalDriver( private inner class BareMetalServerContext(val events: EventFlow<ServerEvent>) : ServerManagementContext { private var finalized: Boolean = false + // A state in which the machine is still available, but does not run any of the work requested by the + // image + var unavailable = false + override val cpus: List<ProcessingUnit> = this@SimpleBareMetalDriver.cpus override val server: Server @@ -266,7 +269,9 @@ public class SimpleBareMetalDriver( } } - usageState.value = totalUsage / cpus.size + if (!unavailable) { + usageState.value = totalUsage / cpus.size + } try { delay(duration) @@ -276,7 +281,7 @@ public class SimpleBareMetalDriver( } val end = simulationContext.clock.millis() - // Flush the load if the do not receive a new run call for the same timestamp + // Flush the load if they do not receive a new run call for the same timestamp flush = domain.launch(job) { delay(1) usageState.value = 0.0 @@ -285,6 +290,10 @@ public class SimpleBareMetalDriver( flush = null } + if (unavailable) { + return + } + // Write back the remaining burst time for (i in 0 until min(cpus.size, burst.size)) { val usage = min(limit[i], cpus[i].frequency) @@ -298,12 +307,11 @@ public class SimpleBareMetalDriver( get() = domain override suspend fun fail() { - try { - serverContext?.cancel(fail = true) - domain.cancel() - } catch (_: CancellationException) { - // Ignore if the machine has already failed. - } + // serverContext?.unavailable = true + } + + override suspend fun recover() { + // serverContext?.unavailable = false } override fun toString(): String = "SimpleBareMetalDriver(node = ${nodeState.value.uid})" diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt index c21b002d..bd395f0d 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt @@ -50,7 +50,7 @@ object HypervisorImage : Image { try { suspendCancellableCoroutine<Unit> {} } finally { - driver.eventFlow.close() + driver.cancel() } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt index b4626def..c21a9fc0 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt @@ -65,8 +65,8 @@ import kotlin.math.min @OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) class SimpleVirtDriver( private val hostContext: ServerContext, - private val coroutineScope: CoroutineScope -) : VirtDriver { + scope: CoroutineScope +) : VirtDriver, CoroutineScope by scope { /** * The [Server] on which this hypervisor runs. */ @@ -98,7 +98,7 @@ class SimpleVirtDriver( it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel? performanceModel?.computeIntersectingItems(imagesRunning) } - }.launchIn(coroutineScope) + }.launchIn(this) } override suspend fun spawn( @@ -123,6 +123,10 @@ class SimpleVirtDriver( return server } + internal fun cancel() { + eventFlow.close() + } + /** * A flag to indicate the driver is stopped. */ @@ -141,7 +145,7 @@ class SimpleVirtDriver( /** * Schedule the vCPUs on the physical CPUs. */ - private suspend fun reschedule() { + private fun reschedule() { flush() // Do not schedule a call if there is no work to schedule or the driver stopped. @@ -149,7 +153,7 @@ class SimpleVirtDriver( return } - val call = coroutineScope.launch { + val call = launch { val start = simulationContext.clock.millis() val vms = activeVms.toSet() @@ -219,7 +223,7 @@ class SimpleVirtDriver( val fraction = req.allocatedUsage / totalUsage // Derive the burst that was allocated to this vCPU - val allocatedBurst = ceil(duration * req.allocatedUsage).toLong() + val allocatedBurst = ceil(totalBurst * fraction).toLong() // Compute the burst time that the VM was actually granted val grantedBurst = (performanceScore * (allocatedBurst - ceil(totalRemainder * fraction))).toLong() @@ -244,6 +248,9 @@ class SimpleVirtDriver( server ) ) + + // Make sure we reschedule the remaining amount of work (if we did not obtain the entire request) + reschedule() } this.call = call } @@ -286,7 +293,7 @@ class SimpleVirtDriver( var chan = Channel<Unit>(Channel.RENDEZVOUS) private var initialized: Boolean = false - internal val job: Job = coroutineScope.launch { + internal val job: Job = launch { delay(1) // TODO Introduce boot time init() try { @@ -331,8 +338,8 @@ class SimpleVirtDriver( server = server.copy(state = serverState) availableMemory += server.flavor.memorySize vms.remove(this) - events.close() eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimpleVirtDriver, vms.size, availableMemory)) + events.close() } override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { |
