summaryrefslogtreecommitdiff
path: root/opendc/opendc-compute/src
diff options
context:
space:
mode:
authorGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-04-03 22:13:35 +0200
committerGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-04-03 22:13:35 +0200
commit6fc21e6df6345c6ec029cc50674352949af83510 (patch)
treece62c5f919fb45e7bf00800bab86defb3521a18c /opendc/opendc-compute/src
parenta625066b997cfeeb31c88dddeb17fc67ea75d6e6 (diff)
parent1f67deb18d1430931aec955e7c129cb0d714718c (diff)
Merge branch 'feat/failure-recovery' into '2.x'
Add initial prototype for failure recovery See merge request opendc/opendc-simulator!51
Diffstat (limited to 'opendc/opendc-compute/src')
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt32
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt2
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt23
-rw-r--r--opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt37
4 files changed, 66 insertions, 28 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
index 9396699a..37ae9eb5 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt
@@ -49,7 +49,6 @@ import com.atlarge.opendc.core.services.ServiceRegistry
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
-import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.launch
@@ -182,6 +181,10 @@ public class SimpleBareMetalDriver(
private inner class BareMetalServerContext(val events: EventFlow<ServerEvent>) : ServerManagementContext {
private var finalized: Boolean = false
+ // A state in which the machine is still available, but does not run any of the work requested by the
+ // image
+ var unavailable = false
+
override val cpus: List<ProcessingUnit> = this@SimpleBareMetalDriver.cpus
override val server: Server
@@ -266,7 +269,9 @@ public class SimpleBareMetalDriver(
}
}
- usageState.value = totalUsage / cpus.size
+ if (!unavailable) {
+ usageState.value = totalUsage / cpus.size
+ }
try {
delay(duration)
@@ -276,7 +281,7 @@ public class SimpleBareMetalDriver(
}
val end = simulationContext.clock.millis()
- // Flush the load if the do not receive a new run call for the same timestamp
+ // Flush the load if they do not receive a new run call for the same timestamp
flush = domain.launch(job) {
delay(1)
usageState.value = 0.0
@@ -285,6 +290,10 @@ public class SimpleBareMetalDriver(
flush = null
}
+ if (unavailable) {
+ return
+ }
+
// Write back the remaining burst time
for (i in 0 until min(cpus.size, burst.size)) {
val usage = min(limit[i], cpus[i].frequency)
@@ -298,12 +307,17 @@ public class SimpleBareMetalDriver(
get() = domain
override suspend fun fail() {
- try {
- serverContext?.cancel(fail = true)
- domain.cancel()
- } catch (_: CancellationException) {
- // Ignore if the machine has already failed.
- }
+ serverContext?.unavailable = true
+
+ val server = nodeState.value.server?.copy(state = ServerState.ERROR)
+ setNode(nodeState.value.copy(state = NodeState.ERROR, server = server))
+ }
+
+ override suspend fun recover() {
+ serverContext?.unavailable = false
+
+ val server = nodeState.value.server?.copy(state = ServerState.ACTIVE)
+ setNode(nodeState.value.copy(state = NodeState.ACTIVE, server = server))
}
override fun toString(): String = "SimpleBareMetalDriver(node = ${nodeState.value.uid})"
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
index c21b002d..bd395f0d 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/HypervisorImage.kt
@@ -50,7 +50,7 @@ object HypervisorImage : Image {
try {
suspendCancellableCoroutine<Unit> {}
} finally {
- driver.eventFlow.close()
+ driver.cancel()
}
}
}
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
index b4626def..c21a9fc0 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/SimpleVirtDriver.kt
@@ -65,8 +65,8 @@ import kotlin.math.min
@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
class SimpleVirtDriver(
private val hostContext: ServerContext,
- private val coroutineScope: CoroutineScope
-) : VirtDriver {
+ scope: CoroutineScope
+) : VirtDriver, CoroutineScope by scope {
/**
* The [Server] on which this hypervisor runs.
*/
@@ -98,7 +98,7 @@ class SimpleVirtDriver(
it.server.image.tags[IMAGE_PERF_INTERFERENCE_MODEL] as? PerformanceInterferenceModel?
performanceModel?.computeIntersectingItems(imagesRunning)
}
- }.launchIn(coroutineScope)
+ }.launchIn(this)
}
override suspend fun spawn(
@@ -123,6 +123,10 @@ class SimpleVirtDriver(
return server
}
+ internal fun cancel() {
+ eventFlow.close()
+ }
+
/**
* A flag to indicate the driver is stopped.
*/
@@ -141,7 +145,7 @@ class SimpleVirtDriver(
/**
* Schedule the vCPUs on the physical CPUs.
*/
- private suspend fun reschedule() {
+ private fun reschedule() {
flush()
// Do not schedule a call if there is no work to schedule or the driver stopped.
@@ -149,7 +153,7 @@ class SimpleVirtDriver(
return
}
- val call = coroutineScope.launch {
+ val call = launch {
val start = simulationContext.clock.millis()
val vms = activeVms.toSet()
@@ -219,7 +223,7 @@ class SimpleVirtDriver(
val fraction = req.allocatedUsage / totalUsage
// Derive the burst that was allocated to this vCPU
- val allocatedBurst = ceil(duration * req.allocatedUsage).toLong()
+ val allocatedBurst = ceil(totalBurst * fraction).toLong()
// Compute the burst time that the VM was actually granted
val grantedBurst = (performanceScore * (allocatedBurst - ceil(totalRemainder * fraction))).toLong()
@@ -244,6 +248,9 @@ class SimpleVirtDriver(
server
)
)
+
+ // Make sure we reschedule the remaining amount of work (if we did not obtain the entire request)
+ reschedule()
}
this.call = call
}
@@ -286,7 +293,7 @@ class SimpleVirtDriver(
var chan = Channel<Unit>(Channel.RENDEZVOUS)
private var initialized: Boolean = false
- internal val job: Job = coroutineScope.launch {
+ internal val job: Job = launch {
delay(1) // TODO Introduce boot time
init()
try {
@@ -331,8 +338,8 @@ class SimpleVirtDriver(
server = server.copy(state = serverState)
availableMemory += server.flavor.memorySize
vms.remove(this)
- events.close()
eventFlow.emit(HypervisorEvent.VmsUpdated(this@SimpleVirtDriver, vms.size, availableMemory))
+ events.close()
}
override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) {
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
index c7347783..85bdc438 100644
--- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
+++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/service/SimpleVirtProvisioningService.kt
@@ -122,7 +122,7 @@ class SimpleVirtProvisioningService(
val requiredMemory = (imageInstance.image as VmImage).requiredMemory
val selectedHv = allocationLogic.select(availableHypervisors, imageInstance) ?: break
try {
- log.info("Spawning ${imageInstance.image} on ${selectedHv.server}")
+ log.info("Spawning ${imageInstance.image} on ${selectedHv.server} ${availableHypervisors.size}")
incomingImages -= imageInstance
// Speculatively update the hypervisor view information to prevent other images in the queue from
@@ -139,6 +139,19 @@ class SimpleVirtProvisioningService(
imageInstance.server = server
imageInstance.continuation.resume(server)
activeImages += imageInstance
+
+ server.events
+ .onEach { event ->
+ when (event) {
+ is ServerEvent.StateChanged -> {
+ if (event.server.state == ServerState.SHUTOFF) {
+ activeImages -= imageInstance
+ selectedHv.provisionedCores -= server.flavor.cpuCount
+ }
+ }
+ }
+ }
+ .launchIn(this)
} catch (e: InsufficientMemoryOnServerException) {
println("Unable to deploy image due to insufficient memory")
@@ -152,18 +165,22 @@ class SimpleVirtProvisioningService(
private fun stateChanged(server: Server) {
when (server.state) {
ServerState.ACTIVE -> {
- val hv = HypervisorView(
- server.uid,
- server,
- 0,
- server.flavor.memorySize,
- 0
- )
- hypervisors[server] = hv
+ if (server in hypervisors) {
+ // Corner case for when the hypervisor already exists
+ availableHypervisors += hypervisors.getValue(server)
+ } else {
+ val hv = HypervisorView(
+ server.uid,
+ server,
+ 0,
+ server.flavor.memorySize,
+ 0
+ )
+ hypervisors[server] = hv
+ }
}
ServerState.SHUTOFF, ServerState.ERROR -> {
val hv = hypervisors[server] ?: return
- hv.provisionedCores -= server.flavor.cpuCount
availableHypervisors -= hv
requestCycle()
}