summaryrefslogtreecommitdiff
path: root/opendc/opendc-compute/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-03-13 20:39:00 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-03-25 10:41:21 +0100
commitafe62e3cb8e2050544b4df0f8bbf071abe0e8dce (patch)
tree5a430bf28fa1573dbb6e708c03dec65fc792791b /opendc/opendc-compute/src
parent59a7470853957d6055c120e9bf8658b4b7b48879 (diff)
feat: Propagate machine failure to VM
Diffstat (limited to 'opendc/opendc-compute/src')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt2
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt21
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt12
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt65
-rw-r--r--opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt1
5 files changed, 59 insertions, 42 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt
index 1596b3b9..e77b55a6 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt
@@ -27,8 +27,6 @@ package com.atlarge.opendc.compute.core.image
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.core.resource.TagContainer
import kotlinx.coroutines.ensureActive
-import kotlinx.coroutines.isActive
-import java.lang.Exception
import java.util.UUID
import kotlin.coroutines.coroutineContext
import kotlin.math.min
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index 5f5dfb66..fbc5c0ce 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -168,7 +168,6 @@ public class SimpleBareMetalDriver(
override suspend fun refresh(): Node = withContext(domain.coroutineContext) { node }
private inner class BareMetalServerContext : ServerManagementContext {
- private val job: Job
private var finalized: Boolean = false
override val cpus: List<ProcessingUnit> = this@SimpleBareMetalDriver.cpus
@@ -176,15 +175,13 @@ public class SimpleBareMetalDriver(
override val server: Server
get() = node.server!!
- init {
- job = domain.launch {
- init()
- try {
- server.image(this@BareMetalServerContext)
- exit()
- } catch (cause: Throwable) {
- exit(cause)
- }
+ private val job = domain.launch {
+ init()
+ try {
+ server.image(this@BareMetalServerContext)
+ exit()
+ } catch (cause: Throwable) {
+ exit(cause)
}
}
@@ -200,6 +197,8 @@ public class SimpleBareMetalDriver(
}
override suspend fun init() {
+ assert(!finalized) { "Machine is already finalized" }
+
val server = server.copy(state = ServerState.ACTIVE)
node = node.copy(state = NodeState.ACTIVE, server = server)
}
@@ -259,7 +258,7 @@ public class SimpleBareMetalDriver(
val end = simulationContext.clock.millis()
// Flush the load if the do not receive a new run call for the same timestamp
- flush = domain.launch {
+ flush = domain.launch(job) {
delay(1)
usageSignal.value = 0.0
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt
index 8d055953..0f4d3c15 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorImage.kt
@@ -29,6 +29,7 @@ import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.virt.driver.VirtDriver
import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
import com.atlarge.opendc.core.resource.TagContainer
+import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.suspendCancellableCoroutine
import java.util.UUID
@@ -43,11 +44,12 @@ class HypervisorImage(
override val tags: TagContainer = emptyMap()
override suspend fun invoke(ctx: ServerContext) {
- val driver = HypervisorVirtDriver(ctx, hypervisorMonitor)
+ coroutineScope {
+ val driver = HypervisorVirtDriver(ctx, hypervisorMonitor, this)
+ ctx.publishService(VirtDriver.Key, driver)
- ctx.publishService(VirtDriver.Key, driver)
-
- // Suspend image until it is cancelled
- suspendCancellableCoroutine<Unit> {}
+ // Suspend image until it is cancelled
+ suspendCancellableCoroutine<Unit> {}
+ }
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
index 8bce7d9d..1ff33c0c 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt
@@ -24,7 +24,7 @@
package com.atlarge.opendc.compute.virt.driver.hypervisor
-import com.atlarge.odcsim.SimulationContext
+import com.atlarge.odcsim.Domain
import com.atlarge.odcsim.simulationContext
import com.atlarge.opendc.compute.core.Flavor
import com.atlarge.opendc.compute.core.ProcessingUnit
@@ -32,6 +32,7 @@ import com.atlarge.opendc.compute.core.Server
import com.atlarge.opendc.compute.core.ServerState
import com.atlarge.opendc.compute.core.execution.ServerContext
import com.atlarge.opendc.compute.core.execution.ServerManagementContext
+import com.atlarge.opendc.compute.core.execution.ShutdownException
import com.atlarge.opendc.compute.core.execution.assertFailure
import com.atlarge.opendc.compute.core.image.Image
import com.atlarge.opendc.compute.core.monitor.ServerMonitor
@@ -41,7 +42,9 @@ import com.atlarge.opendc.compute.virt.monitor.HypervisorMonitor
import com.atlarge.opendc.core.workload.IMAGE_PERF_INTERFERENCE_MODEL
import com.atlarge.opendc.core.workload.PerformanceInterferenceModel
import kotlinx.coroutines.CancellationException
+import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
+import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import java.util.UUID
@@ -54,7 +57,8 @@ import kotlin.math.min
*/
class HypervisorVirtDriver(
private val hostContext: ServerContext,
- private val monitor: HypervisorMonitor
+ private val monitor: HypervisorMonitor,
+ private val coroutineScope: CoroutineScope
) : VirtDriver {
/**
* A set for tracking the VM context objects.
@@ -80,7 +84,7 @@ class HypervisorVirtDriver(
val server = Server(UUID.randomUUID(), "<unnamed>", emptyMap(), flavor, image, ServerState.BUILD)
availableMemory -= requiredMemory
- vms.add(VmServerContext(server, monitor, simulationContext))
+ vms.add(VmServerContext(server, monitor, simulationContext.domain))
monitors.forEach { it.onUpdate(vms.size, availableMemory) }
return server
}
@@ -94,6 +98,11 @@ class HypervisorVirtDriver(
}
/**
+ * A flag to indicate the driver is stopped.
+ */
+ private var stopped: Boolean = false
+
+ /**
* The set of [VmServerContext] instances that is being scheduled at the moment.
*/
private val activeVms = mutableSetOf<VmServerContext>()
@@ -109,12 +118,12 @@ class HypervisorVirtDriver(
private suspend fun reschedule() {
flush()
- // Do not schedule a call if there is no work to schedule
- if (activeVms.isEmpty()) {
+ // Do not schedule a call if there is no work to schedule or the driver stopped.
+ if (stopped || activeVms.isEmpty()) {
return
}
- val call = simulationContext.domain.launch {
+ val call = coroutineScope.launch {
val start = simulationContext.clock.millis()
val vms = activeVms.toSet()
@@ -210,17 +219,17 @@ class HypervisorVirtDriver(
)
}
this.call = call
- call.invokeOnCompletion { this.call = null }
}
/**
* Flush the progress of the current active VMs.
*/
- private fun flush() {
+ private suspend fun flush() {
val call = call ?: return // If there is no active call, there is nothing to flush
- // The progress is actually flushed in the coroutine when it notices we cancel it and wait for its
+ // The progress is actually flushed in the coroutine when it notices: we cancel it and wait for its
// completion.
- call.cancel()
+ call.cancelAndJoin()
+ this.call = null
}
/**
@@ -241,15 +250,16 @@ class HypervisorVirtDriver(
internal inner class VmServerContext(
override var server: Server,
val monitor: ServerMonitor,
- ctx: SimulationContext
+ val domain: Domain
) : ServerManagementContext {
+ private var finalized: Boolean = false
lateinit var requests: List<CpuRequest>
lateinit var burst: LongArray
var deadline: Long = 0L
var chan = Channel<Unit>(Channel.RENDEZVOUS)
private var initialized: Boolean = false
- internal val job: Job = ctx.domain.launch {
+ internal val job: Job = coroutineScope.launch {
init()
try {
server.image(this@VmServerContext)
@@ -259,27 +269,36 @@ class HypervisorVirtDriver(
}
}
+ private suspend fun setServer(value: Server) {
+ val field = server
+ if (field.state != value.state) {
+ monitor.onUpdate(value, field.state)
+ }
+
+ server = value
+ }
+
override val cpus: List<ProcessingUnit> = hostContext.cpus.take(server.flavor.cpuCount)
override suspend fun init() {
- if (initialized) {
- throw IllegalStateException()
- }
+ assert(!finalized) { "VM is already finalized" }
- val previousState = server.state
- server = server.copy(state = ServerState.ACTIVE)
- monitor.onUpdate(server, previousState)
+ setServer(server.copy(state = ServerState.ACTIVE))
initialized = true
}
override suspend fun exit(cause: Throwable?) {
- val previousState = server.state
- val state = if (cause == null) ServerState.SHUTOFF else ServerState.ERROR
- server = server.copy(state = state)
+ finalized = true
+
+ val serverState =
+ if (cause == null || (cause is ShutdownException && cause.cause == null))
+ ServerState.SHUTOFF
+ else
+ ServerState.ERROR
+ setServer(server.copy(state = serverState))
availableMemory += server.flavor.memorySize
- monitor.onUpdate(server, previousState)
- initialized = false
vms.remove(this)
+
monitors.forEach { it.onUpdate(vms.size, availableMemory) }
}
diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
index b78c0b8c..166e93b8 100644
--- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
+++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt
@@ -34,7 +34,6 @@ import com.atlarge.opendc.compute.core.image.FlopsApplicationImage
import com.atlarge.opendc.compute.metal.Node
import com.atlarge.opendc.compute.metal.NodeState
import com.atlarge.opendc.compute.metal.monitor.NodeMonitor
-import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext