diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-03-13 15:50:58 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-03-25 10:41:21 +0100 |
| commit | 59a7470853957d6055c120e9bf8658b4b7b48879 (patch) | |
| tree | e5682bf7e875361965bf0ac1beb9942138759140 /opendc/opendc-compute/src | |
| parent | 99cc96fc51f1b894c8c05b1cde69d60463cc732c (diff) | |
feat: Add infrastructure for failures
Diffstat (limited to 'opendc/opendc-compute/src')
6 files changed, 123 insertions, 44 deletions
diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ShutdownException.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ShutdownException.kt new file mode 100644 index 00000000..abf6f8db --- /dev/null +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/execution/ShutdownException.kt @@ -0,0 +1,53 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.compute.core.execution + +import kotlinx.coroutines.CancellationException + +/** + * This exception is thrown by the underlying [ServerContext] to indicate that a shutdown signal + * has been sent to the server. + */ +public class ShutdownException(message: String? = null, override val cause: Throwable? = null) : CancellationException(message) + +/** + * This method terminates the current active coroutine if the specified [CancellationException] is caused + * by a shutdown. + */ +public fun CancellationException.assertShutdown() { + if (this is ShutdownException) { + throw this + } +} + +/** + * This method terminates the current active coroutine if the specified [CancellationException] is caused + * by a failure. + */ +public fun CancellationException.assertFailure() { + if (this is ShutdownException && cause != null) { + throw this + } +} diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt index 107237ea..1596b3b9 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/core/image/FlopsApplicationImage.kt @@ -26,7 +26,9 @@ package com.atlarge.opendc.compute.core.image import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.core.resource.TagContainer +import kotlinx.coroutines.ensureActive import kotlinx.coroutines.isActive +import java.lang.Exception import java.util.UUID import kotlin.coroutines.coroutineContext import kotlin.math.min @@ -64,11 +66,8 @@ data class FlopsApplicationImage( val burst = LongArray(cores) { flops / cores } val maxUsage = DoubleArray(cores) { i -> ctx.cpus[i].frequency * utilization } - while (coroutineContext.isActive) { - if (burst.all { it == 0L }) { - break - } - + while (burst.any { it != 0L }) { + coroutineContext.ensureActive() ctx.run(burst, maxUsage, Long.MAX_VALUE) } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt index fb2ff355..3956338b 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/BareMetalDriver.kt @@ -28,6 +28,7 @@ import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.monitor.NodeMonitor +import com.atlarge.opendc.core.failure.FailureDomain import com.atlarge.opendc.core.power.Powerable import com.atlarge.opendc.core.services.AbstractServiceKey import kotlinx.coroutines.flow.Flow @@ -36,7 +37,7 @@ import java.util.UUID /** * A driver interface for the management interface of a bare-metal compute node. */ -public interface BareMetalDriver : Powerable { +public interface BareMetalDriver : Powerable, FailureDomain { /** * The amount of work done by the machine in percentage with respect to the total amount of processing power * available. diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt index 2d803aa5..5f5dfb66 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriver.kt @@ -33,6 +33,8 @@ import com.atlarge.opendc.compute.core.Flavor import com.atlarge.opendc.compute.core.MemoryUnit import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.execution.ServerManagementContext +import com.atlarge.opendc.compute.core.execution.ShutdownException +import com.atlarge.opendc.compute.core.execution.assertFailure import com.atlarge.opendc.compute.core.image.EmptyImage import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.metal.Node @@ -42,6 +44,7 @@ import com.atlarge.opendc.compute.metal.power.ConstantPowerModel import com.atlarge.opendc.core.power.PowerModel import kotlinx.coroutines.CancellationException import kotlinx.coroutines.Job +import kotlinx.coroutines.cancel import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.launch @@ -50,6 +53,7 @@ import kotlin.math.ceil import kotlin.math.max import kotlin.math.min import kotlinx.coroutines.withContext +import java.lang.Exception /** * A basic implementation of the [BareMetalDriver] that simulates an [Image] running on a bare-metal machine. @@ -100,9 +104,9 @@ public class SimpleBareMetalDriver( private val flavor = Flavor(cpus.size, memoryUnits.map { it.size }.sum()) /** - * The job that is running the image. + * The current active server context. */ - private var job: Job? = null + private var serverContext: BareMetalServerContext? = null /** * The signal containing the load of the server. @@ -134,7 +138,7 @@ public class SimpleBareMetalDriver( server.serviceRegistry[BareMetalDriver.Key] = this@SimpleBareMetalDriver node = node.copy(state = NodeState.BOOT, server = server) - launch() + serverContext = BareMetalServerContext() return@withContext node } @@ -144,8 +148,8 @@ public class SimpleBareMetalDriver( } // We terminate the image running on the machine - job?.cancel() - job = null + serverContext!!.cancel(fail = false) + serverContext = null node = node.copy(state = NodeState.SHUTOFF, server = null) return@withContext node @@ -163,46 +167,56 @@ public class SimpleBareMetalDriver( override suspend fun refresh(): Node = withContext(domain.coroutineContext) { node } - /** - * Launch the server image on the machine. - */ - private suspend fun launch() { - val serverContext = serverCtx - - job = domain.launch { - serverContext.init() - try { - node.server!!.image(serverContext) - serverContext.exit() - } catch (cause: Throwable) { - serverContext.exit(cause) - } - } - } - - private val serverCtx = object : ServerManagementContext { - private var initialized: Boolean = false + private inner class BareMetalServerContext : ServerManagementContext { + private val job: Job + private var finalized: Boolean = false override val cpus: List<ProcessingUnit> = this@SimpleBareMetalDriver.cpus override val server: Server get() = node.server!! - override suspend fun init() { - if (initialized) { - throw IllegalStateException() + init { + job = domain.launch { + init() + try { + server.image(this@BareMetalServerContext) + exit() + } catch (cause: Throwable) { + exit(cause) + } } - initialized = true + } + /** + * Cancel the image running on the machine. + */ + suspend fun cancel(fail: Boolean) { + if (fail) + job.cancel(ShutdownException(cause = Exception("Random failure"))) + else + job.cancel(ShutdownException()) + job.join() + } + + override suspend fun init() { val server = server.copy(state = ServerState.ACTIVE) node = node.copy(state = NodeState.ACTIVE, server = server) } override suspend fun exit(cause: Throwable?) { - initialized = false - - val serverState = if (cause == null) ServerState.SHUTOFF else ServerState.ERROR - val nodeState = if (cause == null) node.state else NodeState.ERROR + finalized = true + + val serverState = + if (cause == null || (cause is ShutdownException && cause.cause == null)) + ServerState.SHUTOFF + else + ServerState.ERROR + val nodeState = + if (cause == null || (cause is ShutdownException && cause.cause != null)) + node.state + else + NodeState.ERROR val server = server.copy(state = serverState) node = node.copy(state = nodeState, server = server) } @@ -211,6 +225,7 @@ public class SimpleBareMetalDriver( override suspend fun run(burst: LongArray, limit: DoubleArray, deadline: Long) { require(burst.size == limit.size) { "Array dimensions do not match" } + assert(!finalized) { "Server instance is already finalized" } // If run is called in at the same timestamp as the previous call, cancel the load flush flush?.cancel() @@ -237,8 +252,9 @@ public class SimpleBareMetalDriver( try { delay(duration) - } catch (_: CancellationException) { - // On cancellation, we compute and return the remaining burst + } catch (e: CancellationException) { + // On non-failure cancellation, we compute and return the remaining burst + e.assertFailure() } val end = simulationContext.clock.millis() @@ -259,4 +275,10 @@ public class SimpleBareMetalDriver( } } } + + override suspend fun fail() { + withContext(domain.coroutineContext) { + serverContext?.cancel(fail = true) + } + } } diff --git a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt index 430e5a37..8bce7d9d 100644 --- a/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt +++ b/opendc/opendc-compute/src/main/kotlin/com/atlarge/opendc/compute/virt/driver/hypervisor/HypervisorVirtDriver.kt @@ -32,6 +32,7 @@ import com.atlarge.opendc.compute.core.Server import com.atlarge.opendc.compute.core.ServerState import com.atlarge.opendc.compute.core.execution.ServerContext import com.atlarge.opendc.compute.core.execution.ServerManagementContext +import com.atlarge.opendc.compute.core.execution.assertFailure import com.atlarge.opendc.compute.core.image.Image import com.atlarge.opendc.compute.core.monitor.ServerMonitor import com.atlarge.opendc.compute.virt.driver.VirtDriver @@ -297,11 +298,13 @@ class HypervisorVirtDriver( activeVms += this reschedule() chan.receive() - } catch (_: CancellationException) { + } catch (e: CancellationException) { // On cancellation, we compute and return the remaining burst + e.assertFailure() + } finally { + activeVms -= this + reschedule() } - activeVms -= this - reschedule() } } } diff --git a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt index 24a65b40..b78c0b8c 100644 --- a/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt +++ b/opendc/opendc-compute/src/test/kotlin/com/atlarge/opendc/compute/metal/driver/SimpleBareMetalDriverTest.kt @@ -34,6 +34,7 @@ import com.atlarge.opendc.compute.core.image.FlopsApplicationImage import com.atlarge.opendc.compute.metal.Node import com.atlarge.opendc.compute.metal.NodeState import com.atlarge.opendc.compute.metal.monitor.NodeMonitor +import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext @@ -83,6 +84,6 @@ internal class SimpleBareMetalDriverTest { system.terminate() } - assertEquals(finalState, ServerState.SHUTOFF) + assertEquals(ServerState.SHUTOFF, finalState) } } |
