diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-03-13 20:39:00 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-03-25 10:41:21 +0100 |
| commit | afe62e3cb8e2050544b4df0f8bbf071abe0e8dce (patch) | |
| tree | 5a430bf28fa1573dbb6e708c03dec65fc792791b /opendc/opendc-compute/src | |
| parent | 59a7470853957d6055c120e9bf8658b4b7b48879 (diff) | |
feat: Propagate machine failure to VM
Diffstat (limited to 'opendc/opendc-compute/src')
5 files changed, 59 insertions, 42 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt index 1596b3b9..e77b55a6 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt @@ -27,8 +27,6 @@ package com.atlarge.opendc.compute.core.image import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer import kotlinx.coroutines.ensureActive -import kotlinx.coroutines.isActive -import java.lang.Exception import java.util.UUID import kotlin.coroutines.coroutineContext import kotlin.math.min diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 5f5dfb66..fbc5c0ce 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -168,7 +168,6 @@ public class SimpleBareMetalDriver( override suspend fun refresh(): Node = withContext(domain.coroutineContext) { node } private inner class BareMetalServerContext : ServerManagementContext { - private val job: Job private var finalized: Boolean = false override val cpus: List<ProcessingUnit> = this@SimpleBareMetalDriver.cpus @@ -176,15 +175,13 @@ public class SimpleBareMetalDriver( override val server: Server get() = node.server!! - init { - job = domain.launch { - init() - try { - server.image(this@BareMetalServerContext) - exit() - } catch (cause: Throwable) { - exit(cause) - } + private val job = domain.launch { + init() + try { + server.image(this@BareMetalServerContext) + exit() + } catch (cause: Throwable) { + exit(cause) } } @@ -200,6 +197,8 @@ public class SimpleBareMetalDriver( } override suspend fun init() { + assert(!finalized) { "Machine is already finalized" } + val server = server.copy(state = ServerState.ACTIVE) node = node.copy(state = NodeState.ACTIVE, server = server) } @@ -259,7 +258,7 @@ public class SimpleBareMetalDriver( val end = simulationContext.clock.millis() // Flush the load if the do not receive a new run call for the same timestamp - flush = domain.launch { + flush = domain.launch(job) { delay(1) usageSignal.value = 0.0 } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt index 8d055953..0f4d3c15 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt @@ -29,6 +29,7 @@ import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.virt.driver.VirtDriver import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor import com.atlarge.opendc.core.resource.TagContainer +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.suspendCancellableCoroutine import java.util.UUID @@ -43,11 +44,12 @@ class HypervisorImage( override val tags: TagContainer = emptyMap() override suspend fun invoke(ctx: ServerContext) { - val driver = HypervisorVirtDriver(ctx, hypervisorMonitor) + coroutineScope { + val driver = HypervisorVirtDriver(ctx, hypervisorMonitor, this) + ctx.publishService(VirtDriver.Key, driver) - ctx.publishService(VirtDriver.Key, driver) - - // Suspend image until it is cancelled - suspendCancellableCoroutine<Unit> {} + // Suspend image until it is cancelled + suspendCancellableCoroutine<Unit> {} + } } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt index 8bce7d9d..1ff33c0c 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt @@ -24,7 +24,7 @@ package com.atlarge.opendc.compute.virt.driver.hypervisor -import com.atlarge.odcsim.SimulationContext +import com.atlarge.odcsim.Domain import com.atlarge.odcsim.simulationContext import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.ProcessingUnit @@ -32,6 +32,7 @@ import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.compute.core.execution.ServerManagementContext +import com.atlarge.opendc.compute.core.execution.ShutdownException import com.atlarge.opendc.compute.core.execution.assertFailure import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.core.monitor.ServerMonitor @@ -41,7 +42,9 @@ import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL import com.atlarge.opendc.core.workload.PerformanceInterferenceModel import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job +import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch import java.util.UUID @@ -54,7 +57,8 @@ import kotlin.math.min */ class HypervisorVirtDriver( private val hostContext: ServerContext, - private val monitor: HypervisorMonitor + private val monitor: HypervisorMonitor, + private val coroutineScope: CoroutineScope ) : VirtDriver { /** * A set for tracking the VM context objects. @@ -80,7 +84,7 @@ class HypervisorVirtDriver( val server = Server(UUID.randomUUID(), "<unnamed>", emptyMap(), flavor, image, ServerState.BUILD) availableMemory -= requiredMemory - vms.add(VmServerContext(server, monitor, simulationContext)) + vms.add(VmServerContext(server, monitor, simulationContext.domain)) monitors.forEach { it.onUpdate(vms.size, availableMemory) } return server } @@ -94,6 +98,11 @@ class HypervisorVirtDriver( } /** + * A flag to indicate the driver is stopped. + */ + private var stopped: Boolean = false + + /** * The set of [VmServerContext] instances that is being scheduled at the moment. */ private val activeVms = mutableSetOf<VmServerContext>() @@ -109,12 +118,12 @@ class HypervisorVirtDriver( private suspend fun reschedule() { flush() - // Do not schedule a call if there is no work to schedule - if (activeVms.isEmpty()) { + // Do not schedule a call if there is no work to schedule or the driver stopped. + if (stopped || activeVms.isEmpty()) { return } - val call = simulationContext.domain.launch { + val call = coroutineScope.launch { val start = simulationContext.clock.millis() val vms = activeVms.toSet() @@ -210,17 +219,17 @@ class HypervisorVirtDriver( ) } this.call = call - call.invokeOnCompletion { this.call = null } } /** * Flush the progress of the current active VMs. */ - private fun flush() { + private suspend fun flush() { val call = call ?: return // If there is no active call, there is nothing to flush - // The progress is actually flushed in the coroutine when it notices we cancel it and wait for its + // The progress is actually flushed in the coroutine when it notices: we cancel it and wait for its // completion. - call.cancel() + call.cancelAndJoin() + this.call = null } /** @@ -241,15 +250,16 @@ class HypervisorVirtDriver( internal inner class VmServerContext( override var server: Server, val monitor: ServerMonitor, - ctx: SimulationContext + val domain: Domain ) : ServerManagementContext { + private var finalized: Boolean = false lateinit var requests: List<CpuRequest> lateinit var burst: LongArray var deadline: Long = 0L var chan = Channel<Unit>(Channel.RENDEZVOUS) private var initialized: Boolean = false - internal val job: Job = ctx.domain.launch { + internal val job: Job = coroutineScope.launch { init() try { server.image(this@VmServerContext) @@ -259,27 +269,36 @@ class HypervisorVirtDriver( } } + private suspend fun setServer(value: Server) { + val field = server + if (field.state != value.state) { + monitor.onUpdate(value, field.state) + } + + server = value + } + override val cpus: List<ProcessingUnit> = hostContext.cpus.take(server.flavor.cpuCount) override suspend fun init() { - if (initialized) { - throw IllegalStateException() - } + assert(!finalized) { "VM is already finalized" } - val previousState = server.state - server = server.copy(state = ServerState.ACTIVE) - monitor.onUpdate(server, previousState) + setServer(server.copy(state = ServerState.ACTIVE)) initialized = true } override suspend fun exit(cause: Throwable?) { - val previousState = server.state - val state = if (cause == null) ServerState.SHUTOFF else ServerState.ERROR - server = server.copy(state = state) + finalized = true + + val serverState = + if (cause == null || (cause is ShutdownException && cause.cause == null)) + ServerState.SHUTOFF + else + ServerState.ERROR + setServer(server.copy(state = serverState)) availableMemory += server.flavor.memorySize - monitor.onUpdate(server, previousState) - initialized = false vms.remove(this) + monitors.forEach { it.onUpdate(vms.size, availableMemory) } } diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt index b78c0b8c..166e93b8 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt @@ -34,7 +34,6 @@ import com.atlarge.opendc.compute.core.image.FlopsApplicationImage import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.NodeState import com.atlarge.opendc.compute.metal.monitor.NodeMonitor -import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext |
