diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-10-28 11:58:21 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-10-28 12:03:40 +0200 |
| commit | eb105ae96191d99745c145e5ef0959f5e5c77fc4 (patch) | |
| tree | 841f5e8a428ab0cc1f34e2f13f2bf73a44a212bf | |
| parent | 801ef92fb4d65e301e2488528f1cadd8b4ab6fdd (diff) | |
feat(sim/compute): Add support for chaining workloads
This change adds a new static method `chain` to `SimWorkloads` to chain
multiple workloads sequentially.
3 files changed, 365 insertions, 0 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java new file mode 100644 index 00000000..9304122a --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload; + +import java.util.List; +import java.util.Map; +import org.opendc.simulator.compute.SimMachineContext; +import org.opendc.simulator.compute.SimMemory; +import org.opendc.simulator.compute.SimNetworkInterface; +import org.opendc.simulator.compute.SimProcessingUnit; +import org.opendc.simulator.compute.SimStorageInterface; +import org.opendc.simulator.flow2.FlowGraph; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link SimWorkload} that composes two {@link SimWorkload}s. + */ +final class SimChainWorkload implements SimWorkload { + private static final Logger LOGGER = LoggerFactory.getLogger(SimChainWorkload.class); + + private final SimWorkload[] workloads; + private int activeWorkloadIndex; + + private Context activeContext; + + /** + * Construct a {@link SimChainWorkload} instance. + * + * @param workloads The workloads to chain. + */ + SimChainWorkload(SimWorkload... workloads) { + this.workloads = workloads; + } + + @Override + public void onStart(SimMachineContext ctx) { + final SimWorkload[] workloads = this.workloads; + final int activeWorkloadIndex = this.activeWorkloadIndex; + + if (activeWorkloadIndex >= workloads.length) { + return; + } + + final Context context = new Context(ctx); + activeContext = context; + + if (!context.doStart(workloads[activeWorkloadIndex])) { + ctx.shutdown(); + } + } + + @Override + public void onStop(SimMachineContext ctx) { + final SimWorkload[] workloads = this.workloads; + final int activeWorkloadIndex = this.activeWorkloadIndex; + + if (activeWorkloadIndex >= workloads.length) { + return; + } + + final Context context = activeContext; + activeContext = null; + + context.doStop(workloads[activeWorkloadIndex]); + } + + /** + * A {@link SimMachineContext} that intercepts the shutdown calls. + */ + private class Context implements SimMachineContext { + private final SimMachineContext ctx; + + private Context(SimMachineContext ctx) { + this.ctx = ctx; + } + + @Override + public FlowGraph getGraph() { + return ctx.getGraph(); + } + + @Override + public Map<String, Object> getMeta() { + return ctx.getMeta(); + } + + @Override + public List<? extends SimProcessingUnit> getCpus() { + return ctx.getCpus(); + } + + @Override + public SimMemory getMemory() { + return ctx.getMemory(); + } + + @Override + public List<? extends SimNetworkInterface> getNetworkInterfaces() { + return ctx.getNetworkInterfaces(); + } + + @Override + public List<? extends SimStorageInterface> getStorageInterfaces() { + return ctx.getStorageInterfaces(); + } + + @Override + public void reset() { + ctx.reset(); + } + + @Override + public void shutdown() { + final SimWorkload[] workloads = SimChainWorkload.this.workloads; + final int activeWorkloadIndex = ++SimChainWorkload.this.activeWorkloadIndex; + + if (doStop(workloads[activeWorkloadIndex - 1]) && activeWorkloadIndex < workloads.length) { + ctx.reset(); + + if (doStart(workloads[activeWorkloadIndex])) { + return; + } + } + + ctx.shutdown(); + } + + /** + * Start the specified workload. + * + * @return <code>true</code> if the workload started successfully, <code>false</code> otherwise. + */ + private boolean doStart(SimWorkload workload) { + try { + workload.onStart(this); + } catch (Exception cause) { + LOGGER.warn("Workload failed during onStart callback", cause); + doStop(workload); + return false; + } + + return true; + } + + /** + * Stop the specified workload. + * + * @return <code>true</code> if the workload stopped successfully, <code>false</code> otherwise. + */ + private boolean doStop(SimWorkload workload) { + try { + workload.onStop(this); + } catch (Exception cause) { + LOGGER.warn("Workload failed during onStop callback", cause); + return false; + } + + return true; + } + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java index 8655fd0a..82557d06 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkloads.java @@ -60,4 +60,11 @@ public class SimWorkloads { public static SimWorkload runtime(Duration duration, double utilization) { return runtime(duration.toMillis(), utilization); } + + /** + * Chain the specified <code>workloads</code> into a single {@link SimWorkload}. + */ + public static SimWorkload chain(SimWorkload... workloads) { + return new SimChainWorkload(workloads); + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt new file mode 100644 index 00000000..6bf05f65 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt @@ -0,0 +1,176 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload + +import io.mockk.every +import io.mockk.mockk +import io.mockk.spyk +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.opendc.simulator.compute.SimBareMetalMachine +import org.opendc.simulator.compute.model.MachineModel +import org.opendc.simulator.compute.model.MemoryUnit +import org.opendc.simulator.compute.model.ProcessingNode +import org.opendc.simulator.compute.model.ProcessingUnit +import org.opendc.simulator.compute.runWorkload +import org.opendc.simulator.flow2.FlowEngine +import org.opendc.simulator.kotlin.runSimulation + +/** + * Test suite for the [SimChainWorkload] class. + */ +class SimChainWorkloadTest { + private lateinit var machineModel: MachineModel + + @BeforeEach + fun setUp() { + val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) + + machineModel = MachineModel( + /*cpus*/ List(cpuNode.coreCount) { ProcessingUnit(cpuNode, it, 1000.0) }, + /*memory*/ List(4) { MemoryUnit("Crucial", "MTA18ASF4G72AZ-3G2B1", 3200.0, 32_000) } + ) + } + + @Test + fun testMultipleWorkloads() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workload = + SimWorkloads.chain( + SimRuntimeWorkload(1000, 1.0), + SimRuntimeWorkload(1000, 1.0) + ) + + machine.runWorkload(workload) + + assertEquals(2000, clock.millis()) + } + + @Test + fun testStartFailure() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workloadA = mockk<SimWorkload>() + every { workloadA.onStart(any()) } throws IllegalStateException("Staged") + every { workloadA.onStop(any()) } returns Unit + + val workload = + SimWorkloads.chain( + workloadA, + SimRuntimeWorkload(1000, 1.0) + ) + + machine.runWorkload(workload) + + assertEquals(0, clock.millis()) + } + + @Test + fun testStartFailureSecond() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workloadA = mockk<SimWorkload>() + every { workloadA.onStart(any()) } throws IllegalStateException("Staged") + every { workloadA.onStop(any()) } returns Unit + + val workload = + SimWorkloads.chain( + SimRuntimeWorkload(1000, 1.0), + workloadA, + SimRuntimeWorkload(1000, 1.0) + ) + + machine.runWorkload(workload) + + assertEquals(1000, clock.millis()) + } + + @Test + fun testStopFailure() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workloadA = spyk<SimWorkload>(SimRuntimeWorkload(1000, 1.0)) + every { workloadA.onStop(any()) } throws IllegalStateException("Staged") + + val workload = + SimWorkloads.chain( + workloadA, + SimRuntimeWorkload(1000, 1.0) + ) + + machine.runWorkload(workload) + + assertEquals(1000, clock.millis()) + } + + @Test + fun testStopFailureSecond() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workloadA = spyk<SimWorkload>(SimRuntimeWorkload(1000, 1.0)) + every { workloadA.onStop(any()) } throws IllegalStateException("Staged") + + val workload = + SimWorkloads.chain( + SimRuntimeWorkload(1000, 1.0), + workloadA, + SimRuntimeWorkload(1000, 1.0) + ) + + machine.runWorkload(workload) + + assertEquals(2000, clock.millis()) + } +} |
