summaryrefslogtreecommitdiff
path: root/opendc/opendc-compute/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-04-03 17:05:05 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-04-03 17:05:05 +0200
commitc4016fcfd37550b237f6940eaffb5b4efd607601 (patch)
tree1abe286499e20da066bdf9c1dca778fe78ce6017 /opendc/opendc-compute/src
parenta625066b997cfeeb31c88dddeb17fc67ea75d6e6 (diff)
feat: Add initial prototype for failure recovery
Diffstat (limited to 'opendc/opendc-compute/src')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt26
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt2
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt23
3 files changed, 33 insertions, 18 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index 9396699a..cec1d1a7 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -49,7 +49,6 @@ import com.atlarge.opendc.core.services.ServiceRegistry
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
-import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.launch
@@ -182,6 +181,10 @@ public class SimpleBareMetalDriver(
private inner class BareMetalServerContext(val events: EventFlow<ServerEvent>) : ServerManagementContext {
private var finalized: Boolean = false
+ // A state in which the machine is still available, but does not run any of the work requested by the
+ // image
+ var unavailable = false
+
override val cpus: List<ProcessingUnit> = this@SimpleBareMetalDriver.cpus
override val server: Server
@@ -266,7 +269,9 @@ public class SimpleBareMetalDriver(
}
}
- usageState.value = totalUsage / cpus.size
+ if (!unavailable) {
+ usageState.value = totalUsage / cpus.size
+ }
try {
delay(duration)
@@ -276,7 +281,7 @@ public class SimpleBareMetalDriver(
}
val end = simulationContext.clock.millis()
- // Flush the load if the do not receive a new run call for the same timestamp
+ // Flush the load if they do not receive a new run call for the same timestamp
flush = domain.launch(job) {
delay(1)
usageState.value = 0.0
@@ -285,6 +290,10 @@ public class SimpleBareMetalDriver(
flush = null
}
+ if (unavailable) {
+ return
+ }
+
// Write back the remaining burst time
for (i in 0 until min(cpus.size, burst.size)) {
val usage = min(limit[i], cpus[i].frequency)
@@ -298,12 +307,11 @@ public class SimpleBareMetalDriver(
get() = domain
override suspend fun fail() {
- try {
- serverContext?.cancel(fail = true)
- domain.cancel()
- } catch (_: CancellationException) {
- // Ignore if the machine has already failed.
- }
+ // serverContext?.unavailable = true
+ }
+
+ override suspend fun recover() {
+ // serverContext?.unavailable = false
}
override fun toString(): String = "SimpleBareMetalDriver(node = ${nodeState.value.uid})"
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
index c21b002d..bd395f0d 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
@@ -50,7 +50,7 @@ object HypervisorImage : Image {
try {
suspendCancellableCoroutine<Unit> {}
} finally {
- driver.eventFlow.close()
+ driver.cancel()
}
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index b4626def..c21a9fc0 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -65,8 +65,8 @@ import kotlin.math.min
@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
class SimpleVirtDriver(
private val hostContext: ServerContext,
- private val coroutineScope: CoroutineScope
-) : VirtDriver {
+ scope: CoroutineScope
+) : VirtDriver, CoroutineScope by scope {
/**
* The [Server] on which this hypervisor runs.
*/
@@ -98,7 +98,7 @@ class SimpleVirtDriver(
it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
performanceModel?.computeIntersectingItems(imagesRunning)
}
- }.launchIn(coroutineScope)
+ }.launchIn(this)
}
override suspend fun spawn(
@@ -123,6 +123,10 @@ class SimpleVirtDriver(
return server
}
+ internal fun cancel() {
+ eventFlow.close()
+ }
+
/**
* A flag to indicate the driver is stopped.
*/
@@ -141,7 +145,7 @@ class SimpleVirtDriver(
/**
* Schedule the vCPUs on the physical CPUs.
*/
- private suspend fun reschedule() {
+ private fun reschedule() {
flush()
// Do not schedule a call if there is no work to schedule or the driver stopped.
@@ -149,7 +153,7 @@ class SimpleVirtDriver(
return
}
- val call = coroutineScope.launch {
+ val call = launch {
val start = simulationContext.clock.millis()
val vms = activeVms.toSet()
@@ -219,7 +223,7 @@ class SimpleVirtDriver(
val fraction = req.allocatedUsage / totalUsage
// Derive the burst that was allocated to this vCPU
- val allocatedBurst = ceil(duration * req.allocatedUsage).toLong()
+ val allocatedBurst = ceil(totalBurst * fraction).toLong()
// Compute the burst time that the VM was actually granted
val grantedBurst = (performanceScore * (allocatedBurst - ceil(totalRemainder * fraction))).toLong()
@@ -244,6 +248,9 @@ class SimpleVirtDriver(
server
)
)
+
+ // Make sure we reschedule the remaining amount of work (if we did not obtain the entire request)
+ reschedule()
}
this.call = call
}
@@ -286,7 +293,7 @@ class SimpleVirtDriver(
var chan = Channel<Unit>(Channel.RENDEZVOUS)
private var initialized: Boolean = false
- internal val job: Job = coroutineScope.launch {
+ internal val job: Job = launch {
delay(1) // TODO Introduce boot time
init()
try {
@@ -331,8 +338,8 @@ class SimpleVirtDriver(
server = server.copy(state = serverState)
availableMemory += server.flavor.memorySize
vms.remove(this)
- events.close()
eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimpleVirtDriver, vms.size, availableMemory))
+ events.close()
}
override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) {