From 86c65e875b7dde8872dc81a37aa9dca72eee7782 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 19 Oct 2021 17:27:01 +0200 Subject: refactor(simulator): Support running workloads without coroutines This change updates the SimMachine interface to drop the coroutine requirement for running a workload on a machines. Users can now asynchronously start a workload and receive notifications via the workload callbacks. Users still have the possibility to suspend execution during workload execution by using the new `runWorkload` method, which is implemented on top of the new `startWorkload` primitive. --- .../org/opendc/simulator/compute/Coroutines.kt | 69 +++++++++++++ .../opendc/simulator/compute/SimAbstractMachine.kt | 115 ++++++++++++--------- .../org/opendc/simulator/compute/SimMachine.kt | 17 ++- .../compute/kernel/SimAbstractHypervisor.kt | 81 +++++++++++---- .../simulator/compute/kernel/SimHypervisor.kt | 9 +- .../simulator/compute/workload/SimFlopsWorkload.kt | 2 + .../compute/workload/SimRuntimeWorkload.kt | 2 + .../simulator/compute/workload/SimTraceWorkload.kt | 2 + .../simulator/compute/workload/SimWorkload.kt | 7 ++ .../compute/workload/SimWorkloadLifecycle.kt | 53 +++++++--- 10 files changed, 264 insertions(+), 93 deletions(-) create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt (limited to 'opendc-simulator/opendc-simulator-compute/src/main') diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt new file mode 100644 index 00000000..c23f48dc --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute + +import kotlinx.coroutines.suspendCancellableCoroutine +import org.opendc.simulator.compute.workload.SimWorkload +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException + +/** + * Run the specified [SimWorkload] on this machine and suspend execution util [workload] has finished. + * + * @param workload The workload to start on the machine. + * @param meta The metadata to pass to the workload. + * @return A [SimMachineContext] that represents the execution context for the workload. + * @throws IllegalStateException if a workload is already active on the machine or if the machine is closed. + */ +public suspend fun SimMachine.runWorkload(workload: SimWorkload, meta: Map = emptyMap()) { + return suspendCancellableCoroutine { cont -> + cont.invokeOnCancellation { this@runWorkload.cancel() } + + startWorkload( + object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + try { + workload.onStart(ctx) + } catch (cause: Throwable) { + cont.resumeWithException(cause) + throw cause + } + } + + override fun onStop(ctx: SimMachineContext) { + try { + workload.onStop(ctx) + + if (!cont.isCompleted) { + cont.resume(Unit) + } + } catch (cause: Throwable) { + cont.resumeWithException(cause) + throw cause + } + } + }, + meta + ) + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 5909d980..6a4c594d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -23,6 +23,7 @@ package org.opendc.simulator.compute import kotlinx.coroutines.* +import mu.KotlinLogging import org.opendc.simulator.compute.device.SimNetworkAdapter import org.opendc.simulator.compute.device.SimPeripheral import org.opendc.simulator.compute.model.MachineModel @@ -31,8 +32,6 @@ import org.opendc.simulator.compute.model.NetworkAdapter import org.opendc.simulator.compute.model.StorageDevice import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow.* -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume /** * Abstract implementation of the [SimMachine] interface. @@ -71,56 +70,20 @@ public abstract class SimAbstractMachine( */ public override val peripherals: List = net.map { it as SimNetworkAdapter } - /** - * A flag to indicate that the machine is terminated. - */ - private var isTerminated = false - /** * The current active [Context]. */ private var _ctx: Context? = null - /** - * This method is invoked when the machine is started. - */ - protected open fun onStart(ctx: SimMachineContext) {} - - /** - * This method is invoked when the machine is stopped. - */ - protected open fun onStop(ctx: SimMachineContext) { - _ctx = null - } - - /** - * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished. - */ - override suspend fun run(workload: SimWorkload, meta: Map) { - check(!isTerminated) { "Machine is terminated" } + override fun startWorkload(workload: SimWorkload, meta: Map): SimMachineContext { check(_ctx == null) { "A machine cannot run concurrently" } - return suspendCancellableCoroutine { cont -> - val ctx = Context(meta, cont) - _ctx = ctx - - // Cancel all cpus on cancellation - cont.invokeOnCancellation { ctx.close() } - - engine.batch { - onStart(ctx) - - workload.onStart(ctx) - } - } + val ctx = Context(workload, meta) + ctx.start() + return ctx } - override fun close() { - if (isTerminated) { - return - } - - isTerminated = true + override fun cancel() { _ctx?.close() } @@ -130,15 +93,33 @@ public abstract class SimAbstractMachine( /** * The execution context in which the workload runs. + * + * @param workload The workload that is running on the machine. + * @param meta The metadata passed to the workload. */ - private inner class Context(override val meta: Map, private val cont: Continuation) : SimMachineContext { + private inner class Context( + private val workload: SimWorkload, + override val meta: Map + ) : SimMachineContext { /** * A flag to indicate that the context has been closed. */ private var isClosed = false - override val engine: FlowEngine - get() = this@SimAbstractMachine.engine + override val engine: FlowEngine = this@SimAbstractMachine.engine + + /** + * Start this context. + */ + fun start() { + try { + _ctx = this + engine.batch { workload.onStart(this) } + } catch (cause: Throwable) { + logger.warn(cause) { "Workload failed during onStart callback" } + close() + } + } override val cpus: List = this@SimAbstractMachine.cpus @@ -154,15 +135,43 @@ public abstract class SimAbstractMachine( } isClosed = true + assert(_ctx == this) { "Invariant violation: multiple contexts active for a single machine" } + _ctx = null + + // Cancel all the resources associated with the machine + doCancel() + + try { + workload.onStop(this) + } catch (cause: Throwable) { + logger.warn(cause) { "Workload failed during onStop callback" } + } + } + + /** + * Run the stop procedures for the resources associated with the machine. + */ + private fun doCancel() { engine.batch { for (cpu in cpus) { cpu.cancel() } - } - onStop(this) - cont.resume(Unit) + memory.cancel() + + for (ifx in net) { + (ifx as NetworkAdapterImpl).disconnect() + } + + for (storage in storage) { + val impl = storage as StorageDeviceImpl + impl.read.cancel() + impl.write.cancel() + } + } } + + override fun toString(): String = "SimAbstractMachine.Context" } /** @@ -218,4 +227,12 @@ public abstract class SimAbstractMachine( override fun toString(): String = "SimAbstractMachine.StorageDeviceImpl[name=$name,capacity=$capacity]" } + + private companion object { + /** + * The logging instance associated with this class. + */ + @JvmStatic + private val logger = KotlinLogging.logger {} + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt index ab0b56ae..94581e89 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimMachine.kt @@ -29,7 +29,7 @@ import org.opendc.simulator.compute.workload.SimWorkload /** * A generic machine that is able to run a [SimWorkload]. */ -public interface SimMachine : AutoCloseable { +public interface SimMachine { /** * The model of the machine containing its specifications. */ @@ -41,12 +41,19 @@ public interface SimMachine : AutoCloseable { public val peripherals: List /** - * Converge the specified [SimWorkload] on this machine and suspend execution util the workload has finished. + * Start the specified [SimWorkload] on this machine. + * + * @param workload The workload to start on the machine. + * @param meta The metadata to pass to the workload. + * @return A [SimMachineContext] that represents the execution context for the workload. + * @throws IllegalStateException if a workload is already active on the machine or if the machine is closed. */ - public suspend fun run(workload: SimWorkload, meta: Map = emptyMap()) + public fun startWorkload(workload: SimWorkload, meta: Map = emptyMap()): SimMachineContext /** - * Terminate this machine. + * Cancel the workload that is currently running on this machine. + * + * If no workload is active, this operation is a no-op. */ - public override fun close() + public fun cancel() } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index eda59d2d..07465126 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -28,6 +28,7 @@ import org.opendc.simulator.compute.kernel.cpufreq.ScalingPolicy import org.opendc.simulator.compute.kernel.interference.VmInterferenceDomain import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow.* import org.opendc.simulator.flow.interference.InterferenceKey import org.opendc.simulator.flow.mux.FlowMultiplexer @@ -93,13 +94,20 @@ public abstract class SimAbstractHypervisor( private val governors = mutableListOf() /* SimHypervisor */ - override fun createMachine(model: MachineModel, interferenceId: String?): SimVirtualMachine { + override fun newMachine(model: MachineModel, interferenceId: String?): SimVirtualMachine { require(canFit(model)) { "Machine does not fit" } val vm = VirtualMachine(model, interferenceId) _vms.add(vm) return vm } + override fun removeMachine(machine: SimVirtualMachine) { + if (_vms.remove(machine)) { + // This cast must always succeed, since `_vms` only contains `VirtualMachine` types. + (machine as VirtualMachine).close() + } + } + /* SimWorkload */ override fun onStart(ctx: SimMachineContext) { context = ctx @@ -122,6 +130,8 @@ public abstract class SimAbstractHypervisor( } } + override fun onStop(ctx: SimMachineContext) {} + private var _cpuCount = 0 private var _cpuCapacity = 0.0 @@ -145,11 +155,16 @@ public abstract class SimAbstractHypervisor( private inner class VirtualMachine( model: MachineModel, interferenceId: String? = null - ) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine { + ) : SimAbstractMachine(engine, parent = null, model), SimVirtualMachine, AutoCloseable { + /** + * A flag to indicate that the machine is closed. + */ + private var isClosed = false + /** * The interference key of this virtual machine. */ - private var interferenceKey: InterferenceKey? = interferenceId?.let { interferenceDomain?.createKey(it) } + private val interferenceKey: InterferenceKey? = interferenceId?.let { interferenceDomain?.createKey(it) } /** * The vCPUs of the machine. @@ -181,36 +196,60 @@ public abstract class SimAbstractHypervisor( override val cpuUsage: Double get() = cpus.sumOf(FlowConsumer::rate) - override fun onStart(ctx: SimMachineContext) { - val interferenceKey = interferenceKey - if (interferenceKey != null) { - interferenceDomain?.join(interferenceKey) - } - - super.onStart(ctx) + override fun startWorkload(workload: SimWorkload, meta: Map): SimMachineContext { + check(!isClosed) { "Machine is closed" } + + return super.startWorkload( + object : SimWorkload { + override fun onStart(ctx: SimMachineContext) { + try { + joinInterferenceDomain() + workload.onStart(ctx) + } catch (cause: Throwable) { + leaveInterferenceDomain() + throw cause + } + } + + override fun onStop(ctx: SimMachineContext) { + leaveInterferenceDomain() + workload.onStop(ctx) + } + }, + meta + ) } - override fun onStop(ctx: SimMachineContext) { - super.onStop(ctx) - - val interferenceKey = interferenceKey - if (interferenceKey != null) { - interferenceDomain?.leave(interferenceKey) + override fun close() { + if (isClosed) { + return } - } - override fun close() { - super.close() + isClosed = true + cancel() for (cpu in cpus) { cpu.close() } + } - _vms.remove(this) + /** + * Join the interference domain of the hypervisor. + */ + private fun joinInterferenceDomain() { + val interferenceKey = interferenceKey + if (interferenceKey != null) { + interferenceDomain?.join(interferenceKey) + } + } + /** + * Leave the interference domain of the hypervisor. + */ + private fun leaveInterferenceDomain() { val interferenceKey = interferenceKey if (interferenceKey != null) { - interferenceDomain?.removeKey(interferenceKey) + interferenceDomain?.leave(interferenceKey) } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt index 57d4cf20..a69f419f 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt @@ -67,5 +67,12 @@ public interface SimHypervisor : SimWorkload { * @param model The machine to create. * @param interferenceId An identifier for the interference model. */ - public fun createMachine(model: MachineModel, interferenceId: String? = null): SimVirtualMachine + public fun newMachine(model: MachineModel, interferenceId: String? = null): SimVirtualMachine + + /** + * Remove the specified [machine] from the hypervisor. + * + * @param machine The machine to remove. + */ + public fun removeMachine(machine: SimVirtualMachine) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt index 99f4a1e1..726d1f56 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimFlopsWorkload.kt @@ -48,5 +48,7 @@ public class SimFlopsWorkload( } } + override fun onStop(ctx: SimMachineContext) {} + override fun toString(): String = "SimFlopsWorkload(FLOPs=$flops,utilization=$utilization)" } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt index 2ef3bc43..8a3f5f84 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimRuntimeWorkload.kt @@ -48,5 +48,7 @@ public class SimRuntimeWorkload( } } + override fun onStop(ctx: SimMachineContext) {} + override fun toString(): String = "SimRuntimeWorkload(duration=$duration,utilization=$utilization)" } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt index 53c98409..ce04a790 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimTraceWorkload.kt @@ -40,5 +40,7 @@ public class SimTraceWorkload(private val trace: SimTrace, private val offset: L } } + override fun onStop(ctx: SimMachineContext) {} + override fun toString(): String = "SimTraceWorkload" } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt index b80665fa..61c6e2ad 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkload.kt @@ -37,4 +37,11 @@ public interface SimWorkload { * @param ctx The execution context in which the machine runs. */ public fun onStart(ctx: SimMachineContext) + + /** + * This method is invoked when the workload is stopped. + * + * @param ctx The execution context in which the machine runs. + */ + public fun onStop(ctx: SimMachineContext) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt index cc4f1f6a..742470a1 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.kt @@ -33,31 +33,50 @@ public class SimWorkloadLifecycle(private val ctx: SimMachineContext) { /** * The resource consumers which represent the lifecycle of the workload. */ - private val waiting = mutableSetOf() + private val waiting = HashSet() /** - * Wait for the specified [consumer] to complete before ending the lifecycle of the workload. + * Wait for the specified [source] to complete before ending the lifecycle of the workload. */ - public fun waitFor(consumer: FlowSource): FlowSource { - waiting.add(consumer) - return object : FlowSource by consumer { - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { - try { - consumer.onStop(conn, now, delta) - } finally { - complete(consumer) - } - } - override fun toString(): String = "SimWorkloadLifecycle.Consumer[delegate=$consumer]" - } + public fun waitFor(source: FlowSource): FlowSource { + val wrapper = Wrapper(source) + waiting.add(wrapper) + return wrapper } /** - * Complete the specified [FlowSource]. + * Complete the specified [Wrapper]. */ - private fun complete(consumer: FlowSource) { - if (waiting.remove(consumer) && waiting.isEmpty()) { + private fun complete(wrapper: Wrapper) { + if (waiting.remove(wrapper) && waiting.isEmpty()) { ctx.close() } } + + /** + * A [FlowSource] that wraps [delegate] and informs [SimWorkloadLifecycle] that is has completed. + */ + private inner class Wrapper(private val delegate: FlowSource) : FlowSource { + override fun onStart(conn: FlowConnection, now: Long) { + delegate.onStart(conn, now) + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return delegate.onPull(conn, now, delta) + } + + override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + delegate.onConverge(conn, now, delta) + } + + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + try { + delegate.onStop(conn, now, delta) + } finally { + complete(this) + } + } + + override fun toString(): String = "SimWorkloadLifecycle.Wrapper[delegate=$delegate]" + } } -- cgit v1.2.3