From 3528091684f610d80fcebb5b730d3a201e79a99a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 28 Oct 2022 11:48:59 +0200 Subject: feat(sim/compute): Add completion parameter to startWorkload This change updates the interface of `SimMachine#startWorkload` to introduce a parameter `completion` that is invoked when the workload completes either succesfully or due to failure. This functionality has often been implemented by wrapping a `SimWorkload` and catching its exceptions. However, since this functionality is used in all usages of `SimMachine#startWorkload` we instead embed it into `SimMachine` itself. --- .../simulator/compute/SimAbstractMachine.java | 36 +++++++++++++++++----- .../simulator/compute/SimBareMetalMachine.java | 16 +++++++--- .../org/opendc/simulator/compute/SimMachine.java | 4 ++- .../simulator/compute/kernel/SimHypervisor.java | 19 +++++++++--- .../org/opendc/simulator/compute/Coroutines.kt | 29 ++--------------- 5 files changed, 60 insertions(+), 44 deletions(-) (limited to 'opendc-simulator') diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java index cf5aed03..d90a7d6f 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java @@ -25,6 +25,7 @@ package org.opendc.simulator.compute; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import org.opendc.simulator.compute.device.SimNetworkAdapter; import org.opendc.simulator.compute.model.MachineModel; import org.opendc.simulator.compute.model.MemoryUnit; @@ -62,12 +63,13 @@ public abstract class SimAbstractMachine implements SimMachine { } @Override - public final SimMachineContext startWorkload(SimWorkload workload, Map meta) { + public final SimMachineContext startWorkload( + SimWorkload workload, Map meta, Consumer completion) { if (activeContext != null) { throw new IllegalStateException("A machine cannot run multiple workloads concurrently"); } - final Context ctx = createContext(workload, new HashMap<>(meta)); + final Context ctx = createContext(workload, new HashMap<>(meta), completion); ctx.start(); return ctx; } @@ -83,10 +85,12 @@ public abstract class SimAbstractMachine implements SimMachine { /** * Construct a new {@link Context} instance representing the active execution. * - * @param workload The workload to start on the machine. - * @param meta The metadata to pass to the workload. + * @param workload The workload to start on the machine. + * @param meta The metadata to pass to the workload. + * @param completion A block that is invoked when the workload completes carrying an exception if thrown by the workload. */ - protected abstract Context createContext(SimWorkload workload, Map meta); + protected abstract Context createContext( + SimWorkload workload, Map meta, Consumer completion); /** * Return the active {@link Context} instance (if any). @@ -102,7 +106,9 @@ public abstract class SimAbstractMachine implements SimMachine { private final SimAbstractMachine machine; private final SimWorkload workload; private final Map meta; + private final Consumer completion; private boolean isClosed; + private Exception cause; /** * Construct a new {@link Context} instance. @@ -110,11 +116,17 @@ public abstract class SimAbstractMachine implements SimMachine { * @param machine The {@link SimAbstractMachine} to which the context belongs. * @param workload The {@link SimWorkload} to which the context belongs. * @param meta The metadata passed to the context. + * @param completion A block that is invoked when the workload completes carrying an exception if thrown by the workload. */ - public Context(SimAbstractMachine machine, SimWorkload workload, Map meta) { + public Context( + SimAbstractMachine machine, + SimWorkload workload, + Map meta, + Consumer completion) { this.machine = machine; this.workload = workload; this.meta = meta; + this.completion = completion; } @Override @@ -136,11 +148,19 @@ public abstract class SimAbstractMachine implements SimMachine { // Cancel all the resources associated with the machine doCancel(); + Exception e = this.cause; + try { workload.onStop(this); } catch (Exception cause) { - LOGGER.warn("Workload failed during onStop callback", cause); + if (e != null) { + e.addSuppressed(cause); + } else { + e = cause; + } } + + completion.accept(e); } /** @@ -151,7 +171,7 @@ public abstract class SimAbstractMachine implements SimMachine { machine.activeContext = this; workload.onStart(this); } catch (Exception cause) { - LOGGER.warn("Workload failed during onStart callback", cause); + this.cause = cause; shutdown(); } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimBareMetalMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimBareMetalMachine.java index aa7502d6..11356eb2 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimBareMetalMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimBareMetalMachine.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import org.opendc.simulator.compute.device.SimPeripheral; import org.opendc.simulator.compute.model.MachineModel; import org.opendc.simulator.compute.model.ProcessingUnit; @@ -39,7 +40,7 @@ import org.opendc.simulator.flow2.Inlet; * *

* A {@link SimBareMetalMachine} is a stateful object, and you should be careful when operating this object concurrently. For - * example, the class expects only a single concurrent call to {@link #startWorkload(SimWorkload, Map)}. + * example, the class expects only a single concurrent call to {@link #startWorkload(SimWorkload, Map, Consumer)} )}. */ public final class SimBareMetalMachine extends SimAbstractMachine { /** @@ -192,8 +193,9 @@ public final class SimBareMetalMachine extends SimAbstractMachine { } @Override - protected SimAbstractMachine.Context createContext(SimWorkload workload, Map meta) { - return new Context(this, workload, meta); + protected SimAbstractMachine.Context createContext( + SimWorkload workload, Map meta, Consumer completion) { + return new Context(this, workload, meta, completion); } /** @@ -206,8 +208,12 @@ public final class SimBareMetalMachine extends SimAbstractMachine { private final List net; private final List disk; - private Context(SimBareMetalMachine machine, SimWorkload workload, Map meta) { - super(machine, workload, meta); + private Context( + SimBareMetalMachine machine, + SimWorkload workload, + Map meta, + Consumer completion) { + super(machine, workload, meta, completion); this.graph = machine.graph; this.cpus = machine.cpus; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachine.java index 59599875..1f86aa02 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachine.java @@ -24,6 +24,7 @@ package org.opendc.simulator.compute; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import org.opendc.simulator.compute.device.SimPeripheral; import org.opendc.simulator.compute.model.MachineModel; import org.opendc.simulator.compute.workload.SimWorkload; @@ -47,10 +48,11 @@ public interface SimMachine { * * @param workload The workload to start on the machine. * @param meta The metadata to pass to the workload. + * @param completion A block that is invoked when the workload completes carrying an exception if thrown by the workload. * @return A {@link SimMachineContext} that represents the execution context for the workload. * @throws IllegalStateException if a workload is already active on the machine or if the machine is closed. */ - SimMachineContext startWorkload(SimWorkload workload, Map meta); + SimMachineContext startWorkload(SimWorkload workload, Map meta, Consumer completion); /** * Cancel the active workload on this machine (if any). diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java index 6e295837..f03a0c20 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java @@ -28,6 +28,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.SplittableRandom; +import java.util.function.Consumer; import java.util.stream.Collectors; import org.opendc.simulator.compute.SimAbstractMachine; import org.opendc.simulator.compute.SimMachine; @@ -471,7 +472,8 @@ public final class SimHypervisor implements SimWorkload { } @Override - protected Context createContext(SimWorkload workload, Map meta) { + protected Context createContext( + SimWorkload workload, Map meta, Consumer completion) { if (isClosed) { throw new IllegalStateException("Virtual machine does not exist anymore"); } @@ -482,7 +484,15 @@ public final class SimHypervisor implements SimWorkload { } return new VmContext( - context, this, random, interferenceDomain, counters, SimHypervisor.this.counters, workload, meta); + context, + this, + random, + interferenceDomain, + counters, + SimHypervisor.this.counters, + workload, + meta, + completion); } @Override @@ -538,8 +548,9 @@ public final class SimHypervisor implements SimWorkload { VmCounters vmCounters, HvCounters hvCounters, SimWorkload workload, - Map meta) { - super(machine, workload, meta); + Map meta, + Consumer completion) { + super(machine, workload, meta, completion); this.context = context; this.random = random; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt index c23f48dc..b354caff 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt @@ -39,31 +39,8 @@ public suspend fun SimMachine.runWorkload(workload: SimWorkload, meta: Map cont.invokeOnCancellation { this@runWorkload.cancel() } - startWorkload( - object : SimWorkload { - override fun onStart(ctx: SimMachineContext) { - try { - workload.onStart(ctx) - } catch (cause: Throwable) { - cont.resumeWithException(cause) - throw cause - } - } - - override fun onStop(ctx: SimMachineContext) { - try { - workload.onStop(ctx) - - if (!cont.isCompleted) { - cont.resume(Unit) - } - } catch (cause: Throwable) { - cont.resumeWithException(cause) - throw cause - } - } - }, - meta - ) + startWorkload(workload, meta) { cause -> + if (cause != null) cont.resumeWithException(cause) else cont.resume(Unit) + } } } -- cgit v1.2.3 From 788c007599ac61a41460589f65454aac1857eb81 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 28 Oct 2022 11:55:41 +0200 Subject: refactor(sim/compute): Provide workload constructors in SimWorkloads This change introduces a new class SimWorkloads which provides construction methods for the standard workloads available in OpenDC. --- .../compute/workload/SimFlopsWorkload.java | 4 +- .../compute/workload/SimRuntimeWorkload.java | 4 +- .../compute/workload/SimWorkloadLifecycle.java | 60 --------------------- .../simulator/compute/workload/SimWorkloads.java | 63 ++++++++++++++++++++++ .../org/opendc/simulator/compute/SimMachineTest.kt | 14 ++--- .../compute/kernel/SimSpaceSharedHypervisorTest.kt | 13 +++-- 6 files changed, 80 insertions(+), 78 deletions(-) delete mode 100644 opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.java create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java (limited to 'opendc-simulator') diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java index 255fd1b2..f3efbebb 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java @@ -49,9 +49,9 @@ public class SimFlopsWorkload implements SimWorkload, FlowStageLogic { * Construct a new {@link SimFlopsWorkload}. * * @param flops The number of floating point operations to perform for this task in MFLOPs. - * @param utilization A model of the CPU utilization of the application. + * @param utilization The CPU utilization of the workload. */ - public SimFlopsWorkload(long flops, double utilization) { + SimFlopsWorkload(long flops, double utilization) { if (flops < 0) { throw new IllegalArgumentException("Number of FLOPs must be positive"); } else if (utilization <= 0.0 || utilization > 1.0) { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java index c3380b31..194efafd 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java @@ -48,9 +48,9 @@ public class SimRuntimeWorkload implements SimWorkload, FlowStageLogic { * Construct a new {@link SimRuntimeWorkload}. * * @param duration The duration of the workload in milliseconds. - * @param utilization A model of the CPU utilization of the application. + * @param utilization The CPU utilization of the workload. */ - public SimRuntimeWorkload(long duration, double utilization) { + SimRuntimeWorkload(long duration, double utilization) { if (duration < 0) { throw new IllegalArgumentException("Duration must be positive"); } else if (utilization <= 0.0 || utilization > 1.0) { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.java deleted file mode 100644 index f0e2561f..00000000 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloadLifecycle.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.compute.workload; - -import java.util.HashSet; -import org.opendc.simulator.compute.SimMachineContext; - -/** - * A helper class to manage the lifecycle of a {@link SimWorkload}. - */ -public final class SimWorkloadLifecycle { - private final SimMachineContext ctx; - private final HashSet waiting = new HashSet<>(); - - /** - * Construct a {@link SimWorkloadLifecycle} instance. - * - * @param ctx The {@link SimMachineContext} of the workload. - */ - public SimWorkloadLifecycle(SimMachineContext ctx) { - this.ctx = ctx; - } - - /** - * Register a "completer" callback that must be invoked before ending the lifecycle of the workload. - */ - public Runnable newCompleter() { - Runnable completer = new Runnable() { - @Override - public void run() { - final HashSet waiting = SimWorkloadLifecycle.this.waiting; - if (waiting.remove(this) && waiting.isEmpty()) { - ctx.shutdown(); - } - } - }; - waiting.add(completer); - return completer; - } -} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java new file mode 100644 index 00000000..8655fd0a --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload; + +import java.time.Duration; + +/** + * Helper methods for constructing {@link SimWorkload}s. + */ +public class SimWorkloads { + private SimWorkloads() {} + + /** + * Create a {@link SimWorkload} that executes a specified number of floating point operations (FLOPs) at the given + * utilization. + * + * @param flops The number of floating point operations to perform for this task in MFLOPs. + * @param utilization The CPU utilization of the workload. + */ + public static SimWorkload flops(long flops, double utilization) { + return new SimFlopsWorkload(flops, utilization); + } + + /** + * Create a {@link SimWorkload} that consumes the CPU resources for a specified duration at the given utilization. + * + * @param duration The duration of the workload in milliseconds. + * @param utilization The CPU utilization of the workload. + */ + public static SimWorkload runtime(long duration, double utilization) { + return new SimRuntimeWorkload(duration, utilization); + } + + /** + * Create a {@link SimWorkload} that consumes the CPU resources for a specified duration at the given utilization. + * + * @param duration The duration of the workload. + * @param utilization The CPU utilization of the workload. + */ + public static SimWorkload runtime(Duration duration, double utilization) { + return runtime(duration.toMillis(), utilization); + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index f0aae15b..266839bd 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -40,9 +40,9 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.model.StorageDevice import org.opendc.simulator.compute.power.CpuPowerModels -import org.opendc.simulator.compute.workload.SimFlopsWorkload import org.opendc.simulator.compute.workload.SimTrace import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.compute.workload.SimWorkloads import org.opendc.simulator.flow2.FlowEngine import org.opendc.simulator.flow2.source.SimpleFlowSource import org.opendc.simulator.kotlin.runSimulation @@ -78,7 +78,7 @@ class SimMachineTest { machineModel ) - machine.runWorkload(SimFlopsWorkload(2_000, /*utilization*/ 1.0)) + machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0)) // Two cores execute 1000 MFlOps per second (1000 ms) assertEquals(1000, clock.millis()) @@ -123,7 +123,7 @@ class SimMachineTest { machineModel ) - machine.runWorkload(SimFlopsWorkload(2_000, /*utilization*/ 1.0)) + machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0)) // Two sockets with two cores execute 2000 MFlOps per second (500 ms) assertEquals(500, clock.millis()) @@ -142,7 +142,7 @@ class SimMachineTest { source.connect(machine.psu) coroutineScope { - launch { machine.runWorkload(SimFlopsWorkload(2_000, /*utilization*/ 1.0)) } + launch { machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0)) } yield() assertAll( @@ -304,7 +304,7 @@ class SimMachineTest { try { coroutineScope { - launch { machine.runWorkload(SimFlopsWorkload(2_000, /*utilization*/ 1.0)) } + launch { machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0)) } cancel() } } catch (_: CancellationException) { @@ -326,11 +326,11 @@ class SimMachineTest { coroutineScope { launch { - machine.runWorkload(SimFlopsWorkload(2_000, /*utilization*/ 1.0)) + machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0)) } assertThrows { - machine.runWorkload(SimFlopsWorkload(2_000, /*utilization*/ 1.0)) + machine.runWorkload(SimWorkloads.flops(2_000, /*utilization*/ 1.0)) } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt index ba5a5c68..d11b91ee 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimSpaceSharedHypervisorTest.kt @@ -38,10 +38,9 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.runWorkload -import org.opendc.simulator.compute.workload.SimFlopsWorkload -import org.opendc.simulator.compute.workload.SimRuntimeWorkload import org.opendc.simulator.compute.workload.SimTrace import org.opendc.simulator.compute.workload.SimTraceFragment +import org.opendc.simulator.compute.workload.SimWorkloads import org.opendc.simulator.flow2.FlowEngine import org.opendc.simulator.flow2.mux.FlowMultiplexerFactory import org.opendc.simulator.kotlin.runSimulation @@ -99,7 +98,7 @@ internal class SimSpaceSharedHypervisorTest { @Test fun testRuntimeWorkload() = runSimulation { val duration = 5 * 60L * 1000 - val workload = SimRuntimeWorkload(duration, 1.0) + val workload = SimWorkloads.runtime(duration, 1.0) val engine = FlowEngine.create(coroutineContext, clock) val graph = engine.newGraph() @@ -123,7 +122,7 @@ internal class SimSpaceSharedHypervisorTest { @Test fun testFlopsWorkload() = runSimulation { val duration = 5 * 60L * 1000 - val workload = SimFlopsWorkload((duration * 3.2).toLong(), 1.0) + val workload = SimWorkloads.flops((duration * 3.2).toLong(), 1.0) val engine = FlowEngine.create(coroutineContext, clock) val graph = engine.newGraph() @@ -155,13 +154,13 @@ internal class SimSpaceSharedHypervisorTest { yield() val vm = hypervisor.newMachine(machineModel) - vm.runWorkload(SimRuntimeWorkload(duration, 1.0)) + vm.runWorkload(SimWorkloads.runtime(duration, 1.0)) hypervisor.removeMachine(vm) yield() val vm2 = hypervisor.newMachine(machineModel) - vm2.runWorkload(SimRuntimeWorkload(duration, 1.0)) + vm2.runWorkload(SimWorkloads.runtime(duration, 1.0)) hypervisor.removeMachine(vm2) machine.cancel() @@ -184,7 +183,7 @@ internal class SimSpaceSharedHypervisorTest { yield() val vm = hypervisor.newMachine(machineModel) - launch { vm.runWorkload(SimFlopsWorkload(10_000, 1.0)) } + launch { vm.runWorkload(SimWorkloads.runtime(10_000, 1.0)) } yield() assertAll( -- cgit v1.2.3 From 801ef92fb4d65e301e2488528f1cadd8b4ab6fdd Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 28 Oct 2022 11:56:49 +0200 Subject: feat(sim/compute): Add support for resetting machine context This change updates the interface of `SimMachineContext` to allow workloads to reset all resources provided by the machine to the workload. This allows us to implement a `SimWorkload` that can compose multiple workloads. --- .../simulator/compute/SimAbstractMachine.java | 41 ++++++++++++---------- .../simulator/compute/SimMachineContext.java | 5 +++ 2 files changed, 28 insertions(+), 18 deletions(-) (limited to 'opendc-simulator') diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java index d90a7d6f..d968d884 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java @@ -134,6 +134,28 @@ public abstract class SimAbstractMachine implements SimMachine { return meta; } + @Override + public void reset() { + final FlowGraph graph = getMemory().getInput().getGraph(); + + for (SimProcessingUnit cpu : getCpus()) { + final Inlet inlet = cpu.getInput(); + graph.disconnect(inlet); + } + + graph.disconnect(getMemory().getInput()); + + for (SimNetworkInterface ifx : getNetworkInterfaces()) { + ((NetworkAdapter) ifx).disconnect(); + } + + for (SimStorageInterface storage : getStorageInterfaces()) { + StorageDevice impl = (StorageDevice) storage; + graph.disconnect(impl.getRead()); + graph.disconnect(impl.getWrite()); + } + } + @Override public final void shutdown() { if (isClosed) { @@ -180,24 +202,7 @@ public abstract class SimAbstractMachine implements SimMachine { * Run the stop procedures for the resources associated with the machine. */ protected void doCancel() { - final FlowGraph graph = getMemory().getInput().getGraph(); - - for (SimProcessingUnit cpu : getCpus()) { - final Inlet inlet = cpu.getInput(); - graph.disconnect(inlet); - } - - graph.disconnect(getMemory().getInput()); - - for (SimNetworkInterface ifx : getNetworkInterfaces()) { - ((NetworkAdapter) ifx).disconnect(); - } - - for (SimStorageInterface storage : getStorageInterfaces()) { - StorageDevice impl = (StorageDevice) storage; - graph.disconnect(impl.getRead()); - graph.disconnect(impl.getWrite()); - } + reset(); } @Override diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java index f6a3bd38..5d08e2b7 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java @@ -67,6 +67,11 @@ public interface SimMachineContext { */ List getStorageInterfaces(); + /** + * Reset all resources of the machine. + */ + void reset(); + /** * Shutdown the workload. */ -- cgit v1.2.3 From eb105ae96191d99745c145e5ef0959f5e5c77fc4 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 28 Oct 2022 11:58:21 +0200 Subject: feat(sim/compute): Add support for chaining workloads This change adds a new static method `chain` to `SimWorkloads` to chain multiple workloads sequentially. --- .../compute/workload/SimChainWorkload.java | 182 +++++++++++++++++++++ .../simulator/compute/workload/SimWorkloads.java | 7 + .../compute/workload/SimChainWorkloadTest.kt | 176 ++++++++++++++++++++ 3 files changed, 365 insertions(+) create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java create mode 100644 opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt (limited to 'opendc-simulator') diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java new file mode 100644 index 00000000..9304122a --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload; + +import java.util.List; +import java.util.Map; +import org.opendc.simulator.compute.SimMachineContext; +import org.opendc.simulator.compute.SimMemory; +import org.opendc.simulator.compute.SimNetworkInterface; +import org.opendc.simulator.compute.SimProcessingUnit; +import org.opendc.simulator.compute.SimStorageInterface; +import org.opendc.simulator.flow2.FlowGraph; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link SimWorkload} that composes two {@link SimWorkload}s. + */ +final class SimChainWorkload implements SimWorkload { + private static final Logger LOGGER = LoggerFactory.getLogger(SimChainWorkload.class); + + private final SimWorkload[] workloads; + private int activeWorkloadIndex; + + private Context activeContext; + + /** + * Construct a {@link SimChainWorkload} instance. + * + * @param workloads The workloads to chain. + */ + SimChainWorkload(SimWorkload... workloads) { + this.workloads = workloads; + } + + @Override + public void onStart(SimMachineContext ctx) { + final SimWorkload[] workloads = this.workloads; + final int activeWorkloadIndex = this.activeWorkloadIndex; + + if (activeWorkloadIndex >= workloads.length) { + return; + } + + final Context context = new Context(ctx); + activeContext = context; + + if (!context.doStart(workloads[activeWorkloadIndex])) { + ctx.shutdown(); + } + } + + @Override + public void onStop(SimMachineContext ctx) { + final SimWorkload[] workloads = this.workloads; + final int activeWorkloadIndex = this.activeWorkloadIndex; + + if (activeWorkloadIndex >= workloads.length) { + return; + } + + final Context context = activeContext; + activeContext = null; + + context.doStop(workloads[activeWorkloadIndex]); + } + + /** + * A {@link SimMachineContext} that intercepts the shutdown calls. + */ + private class Context implements SimMachineContext { + private final SimMachineContext ctx; + + private Context(SimMachineContext ctx) { + this.ctx = ctx; + } + + @Override + public FlowGraph getGraph() { + return ctx.getGraph(); + } + + @Override + public Map getMeta() { + return ctx.getMeta(); + } + + @Override + public List getCpus() { + return ctx.getCpus(); + } + + @Override + public SimMemory getMemory() { + return ctx.getMemory(); + } + + @Override + public List getNetworkInterfaces() { + return ctx.getNetworkInterfaces(); + } + + @Override + public List getStorageInterfaces() { + return ctx.getStorageInterfaces(); + } + + @Override + public void reset() { + ctx.reset(); + } + + @Override + public void shutdown() { + final SimWorkload[] workloads = SimChainWorkload.this.workloads; + final int activeWorkloadIndex = ++SimChainWorkload.this.activeWorkloadIndex; + + if (doStop(workloads[activeWorkloadIndex - 1]) && activeWorkloadIndex < workloads.length) { + ctx.reset(); + + if (doStart(workloads[activeWorkloadIndex])) { + return; + } + } + + ctx.shutdown(); + } + + /** + * Start the specified workload. + * + * @return true if the workload started successfully, false otherwise. + */ + private boolean doStart(SimWorkload workload) { + try { + workload.onStart(this); + } catch (Exception cause) { + LOGGER.warn("Workload failed during onStart callback", cause); + doStop(workload); + return false; + } + + return true; + } + + /** + * Stop the specified workload. + * + * @return true if the workload stopped successfully, false otherwise. + */ + private boolean doStop(SimWorkload workload) { + try { + workload.onStop(this); + } catch (Exception cause) { + LOGGER.warn("Workload failed during onStop callback", cause); + return false; + } + + return true; + } + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java index 8655fd0a..82557d06 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java @@ -60,4 +60,11 @@ public class SimWorkloads { public static SimWorkload runtime(Duration duration, double utilization) { return runtime(duration.toMillis(), utilization); } + + /** + * Chain the specified workloads into a single {@link SimWorkload}. + */ + public static SimWorkload chain(SimWorkload... workloads) { + return new SimChainWorkload(workloads); + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt new file mode 100644 index 00000000..6bf05f65 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt @@ -0,0 +1,176 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload + +import io.mockk.every +import io.mockk.mockk +import io.mockk.spyk +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.opendc.simulator.compute.SimBareMetalMachine +import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.runWorkload +import org.opendc.simulator.flow2.FlowEngine +import org.opendc.simulator.kotlin.runSimulation + +/** + * Test suite for the [SimChainWorkload] class. + */ +class SimChainWorkloadTest { + private lateinit var machineModel: MachineModel + + @BeforeEach + fun setUp() { + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) + + machineModel = MachineModel( + /*cpus*/ List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) }, + /*memory*/ List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + ) + } + + @Test + fun testMultipleWorkloads() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workload = + SimWorkloads.chain( + SimRuntimeWorkload(1000, 1.0), + SimRuntimeWorkload(1000, 1.0) + ) + + machine.runWorkload(workload) + + assertEquals(2000, clock.millis()) + } + + @Test + fun testStartFailure() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workloadA = mockk() + every { workloadA.onStart(any()) } throws IllegalStateException("Staged") + every { workloadA.onStop(any()) } returns Unit + + val workload = + SimWorkloads.chain( + workloadA, + SimRuntimeWorkload(1000, 1.0) + ) + + machine.runWorkload(workload) + + assertEquals(0, clock.millis()) + } + + @Test + fun testStartFailureSecond() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workloadA = mockk() + every { workloadA.onStart(any()) } throws IllegalStateException("Staged") + every { workloadA.onStop(any()) } returns Unit + + val workload = + SimWorkloads.chain( + SimRuntimeWorkload(1000, 1.0), + workloadA, + SimRuntimeWorkload(1000, 1.0) + ) + + machine.runWorkload(workload) + + assertEquals(1000, clock.millis()) + } + + @Test + fun testStopFailure() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workloadA = spyk(SimRuntimeWorkload(1000, 1.0)) + every { workloadA.onStop(any()) } throws IllegalStateException("Staged") + + val workload = + SimWorkloads.chain( + workloadA, + SimRuntimeWorkload(1000, 1.0) + ) + + machine.runWorkload(workload) + + assertEquals(1000, clock.millis()) + } + + @Test + fun testStopFailureSecond() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workloadA = spyk(SimRuntimeWorkload(1000, 1.0)) + every { workloadA.onStop(any()) } throws IllegalStateException("Staged") + + val workload = + SimWorkloads.chain( + SimRuntimeWorkload(1000, 1.0), + workloadA, + SimRuntimeWorkload(1000, 1.0) + ) + + machine.runWorkload(workload) + + assertEquals(2000, clock.millis()) + } +} -- cgit v1.2.3