diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-10-31 19:36:45 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-10-31 19:36:45 +0100 |
| commit | 9d06fb04319a50c26599f2da5387a2d03bee15ec (patch) | |
| tree | 1dd498da02298c9924f6c37974f4311df6948cb2 | |
| parent | b96acc687f59b698fbc4d4c984d77b008cd4051b (diff) | |
| parent | c9750e52a10157f3838b934fed4f04fae69c539a (diff) | |
merge: Support snapshotting simulated workloads (#113)
This pull request adds support for snapshotting simulated workloads in OpenDC, which
serves as the basis for virtual machine migration/suspension support.
Part of #32
## Implementation Notes :hammer_and_pick:
* Support synchronous update of FlowStage
* Report exceptions in onStop as suppressed
* Add support for snapshotting workloads
14 files changed, 398 insertions, 62 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index b3e56f38..5eccc6ec 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -280,7 +280,7 @@ public class SimHost( val bootWorkload = bootModel.get() val hypervisor = hypervisor - val hypervisorWorkload = object : SimWorkload { + val hypervisorWorkload = object : SimWorkload by hypervisor { override fun onStart(ctx: SimMachineContext) { try { _bootTime = clock.instant() @@ -296,10 +296,6 @@ public class SimHost( throw cause } } - - override fun onStop(ctx: SimMachineContext) { - hypervisor.onStop(ctx) - } } val workload = if (bootWorkload != null) SimWorkloads.chain(bootWorkload, hypervisorWorkload) else hypervisorWorkload diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt index a7fc102a..eb308970 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt @@ -127,6 +127,8 @@ public class SimTFDevice( output = null } + override fun snapshot(): SimWorkload = throw UnsupportedOperationException() + override fun onUpdate(ctx: FlowStage, now: Long): Long { val output = output ?: return Long.MAX_VALUE val lastPull = lastPull diff --git a/opendc-simulator/opendc-simulator-compute/build.gradle.kts b/opendc-simulator/opendc-simulator-compute/build.gradle.kts index f3f90bb6..0ea0c252 100644 --- a/opendc-simulator/opendc-simulator-compute/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-compute/build.gradle.kts @@ -32,7 +32,6 @@ dependencies { api(projects.opendcSimulator.opendcSimulatorPower) api(projects.opendcSimulator.opendcSimulatorNetwork) implementation(projects.opendcSimulator.opendcSimulatorCore) - implementation(libs.kotlin.logging) testImplementation(libs.slf4j.simple) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java index d968d884..f684c54d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java @@ -36,14 +36,11 @@ import org.opendc.simulator.flow2.Outlet; import org.opendc.simulator.flow2.sink.SimpleFlowSink; import org.opendc.simulator.flow2.util.FlowTransformer; import org.opendc.simulator.flow2.util.FlowTransforms; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Abstract implementation of the {@link SimMachine} interface. */ public abstract class SimAbstractMachine implements SimMachine { - private static final Logger LOGGER = LoggerFactory.getLogger(SimAbstractMachine.class); private final MachineModel model; private Context activeContext; @@ -108,7 +105,6 @@ public abstract class SimAbstractMachine implements SimMachine { private final Map<String, Object> meta; private final Consumer<Exception> completion; private boolean isClosed; - private Exception cause; /** * Construct a new {@link Context} instance. @@ -135,6 +131,11 @@ public abstract class SimAbstractMachine implements SimMachine { } @Override + public SimWorkload snapshot() { + return workload.snapshot(); + } + + @Override public void reset() { final FlowGraph graph = getMemory().getInput().getGraph(); @@ -158,6 +159,11 @@ public abstract class SimAbstractMachine implements SimMachine { @Override public final void shutdown() { + shutdown(null); + } + + @Override + public final void shutdown(Exception cause) { if (isClosed) { return; } @@ -170,19 +176,17 @@ public abstract class SimAbstractMachine implements SimMachine { // Cancel all the resources associated with the machine doCancel(); - Exception e = this.cause; - try { workload.onStop(this); - } catch (Exception cause) { - if (e != null) { - e.addSuppressed(cause); + } catch (Exception e) { + if (cause == null) { + cause = e; } else { - e = cause; + cause.addSuppressed(e); } } - completion.accept(e); + completion.accept(cause); } /** @@ -193,8 +197,7 @@ public abstract class SimAbstractMachine implements SimMachine { machine.activeContext = this; workload.onStart(this); } catch (Exception cause) { - this.cause = cause; - shutdown(); + shutdown(cause); } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java index 5d08e2b7..bce5c0a8 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java @@ -24,6 +24,7 @@ package org.opendc.simulator.compute; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import org.opendc.simulator.compute.workload.SimWorkload; import org.opendc.simulator.flow2.FlowGraph; @@ -43,7 +44,7 @@ public interface SimMachineContext { /** * Return the metadata associated with the context. * <p> - * Users can pass this metadata to the workload via {@link SimMachine#startWorkload(SimWorkload, Map)}. + * Users can pass this metadata to the workload via {@link SimMachine#startWorkload(SimWorkload, Map, Consumer)}. */ Map<String, Object> getMeta(); @@ -68,6 +69,13 @@ public interface SimMachineContext { List<? extends SimStorageInterface> getStorageInterfaces(); /** + * Create a snapshot of the {@link SimWorkload} running on this machine. + * + * @throws UnsupportedOperationException if the workload does not support snapshotting. + */ + SimWorkload snapshot(); + + /** * Reset all resources of the machine. */ void reset(); @@ -76,4 +84,11 @@ public interface SimMachineContext { * Shutdown the workload. */ void shutdown(); + + /** + * Shutdown the workload due to failure. + * + * @param cause The cause for shutting down the workload. + */ + void shutdown(Exception cause); } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java index f03a0c20..4ebcba71 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java @@ -240,6 +240,11 @@ public final class SimHypervisor implements SimWorkload { } } + @Override + public SimWorkload snapshot() { + throw new UnsupportedOperationException("Unable to snapshot hypervisor"); + } + /** * The context which carries the state when the hypervisor is running on a machine. */ diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java index 9304122a..7480b3c0 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java @@ -30,15 +30,11 @@ import org.opendc.simulator.compute.SimNetworkInterface; import org.opendc.simulator.compute.SimProcessingUnit; import org.opendc.simulator.compute.SimStorageInterface; import org.opendc.simulator.flow2.FlowGraph; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * A {@link SimWorkload} that composes two {@link SimWorkload}s. */ final class SimChainWorkload implements SimWorkload { - private static final Logger LOGGER = LoggerFactory.getLogger(SimChainWorkload.class); - private final SimWorkload[] workloads; private int activeWorkloadIndex; @@ -48,9 +44,20 @@ final class SimChainWorkload implements SimWorkload { * Construct a {@link SimChainWorkload} instance. * * @param workloads The workloads to chain. + * @param activeWorkloadIndex The index of the active workload. */ - SimChainWorkload(SimWorkload... workloads) { + SimChainWorkload(SimWorkload[] workloads, int activeWorkloadIndex) { this.workloads = workloads; + this.activeWorkloadIndex = activeWorkloadIndex; + } + + /** + * Construct a {@link SimChainWorkload} instance. + * + * @param workloads The workloads to chain. + */ + SimChainWorkload(SimWorkload... workloads) { + this(workloads, 0); } @Override @@ -65,9 +72,7 @@ final class SimChainWorkload implements SimWorkload { final Context context = new Context(ctx); activeContext = context; - if (!context.doStart(workloads[activeWorkloadIndex])) { - ctx.shutdown(); - } + tryThrow(context.doStart(workloads[activeWorkloadIndex])); } @Override @@ -82,7 +87,20 @@ final class SimChainWorkload implements SimWorkload { final Context context = activeContext; activeContext = null; - context.doStop(workloads[activeWorkloadIndex]); + tryThrow(context.doStop(workloads[activeWorkloadIndex])); + } + + @Override + public SimChainWorkload snapshot() { + final int activeWorkloadIndex = this.activeWorkloadIndex; + final SimWorkload[] workloads = this.workloads; + final SimWorkload[] newWorkloads = new SimWorkload[workloads.length - activeWorkloadIndex]; + + for (int i = 0; i < newWorkloads.length; i++) { + newWorkloads[i] = workloads[activeWorkloadIndex + i].snapshot(); + } + + return new SimChainWorkload(newWorkloads, 0); } /** @@ -126,57 +144,92 @@ final class SimChainWorkload implements SimWorkload { } @Override + public SimWorkload snapshot() { + final SimWorkload workload = workloads[activeWorkloadIndex]; + return workload.snapshot(); + } + + @Override public void reset() { ctx.reset(); } @Override public void shutdown() { + shutdown(null); + } + + @Override + public void shutdown(Exception cause) { final SimWorkload[] workloads = SimChainWorkload.this.workloads; final int activeWorkloadIndex = ++SimChainWorkload.this.activeWorkloadIndex; - if (doStop(workloads[activeWorkloadIndex - 1]) && activeWorkloadIndex < workloads.length) { + final Exception stopException = doStop(workloads[activeWorkloadIndex - 1]); + if (cause == null) { + cause = stopException; + } else if (stopException != null) { + cause.addSuppressed(stopException); + } + + if (stopException == null && activeWorkloadIndex < workloads.length) { ctx.reset(); - if (doStart(workloads[activeWorkloadIndex])) { + final Exception startException = doStart(workloads[activeWorkloadIndex]); + + if (startException == null) { return; + } else if (cause == null) { + cause = startException; + } else { + cause.addSuppressed(startException); } } - ctx.shutdown(); + ctx.shutdown(cause); } /** * Start the specified workload. * - * @return <code>true</code> if the workload started successfully, <code>false</code> otherwise. + * @return The {@link Exception} that occurred while starting the workload or <code>null</code> if the workload + * started successfully. */ - private boolean doStart(SimWorkload workload) { + private Exception doStart(SimWorkload workload) { try { workload.onStart(this); } catch (Exception cause) { - LOGGER.warn("Workload failed during onStart callback", cause); - doStop(workload); - return false; + final Exception stopException = doStop(workload); + if (stopException != null) { + cause.addSuppressed(stopException); + } + return cause; } - return true; + return null; } /** * Stop the specified workload. * - * @return <code>true</code> if the workload stopped successfully, <code>false</code> otherwise. + * @return The {@link Exception} that occurred while stopping the workload or <code>null</code> if the workload + * stopped successfully. */ - private boolean doStop(SimWorkload workload) { + private Exception doStop(SimWorkload workload) { try { workload.onStop(this); } catch (Exception cause) { - LOGGER.warn("Workload failed during onStop callback", cause); - return false; + return cause; } - return true; + return null; + } + } + + @SuppressWarnings("unchecked") + private static <T extends Throwable> void tryThrow(Throwable e) throws T { + if (e == null) { + return; } + throw (T) e; } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java index f3efbebb..839856bb 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java @@ -60,6 +60,7 @@ public class SimFlopsWorkload implements SimWorkload, FlowStageLogic { this.flops = flops; this.utilization = utilization; + this.remainingAmount = flops; } @Override @@ -98,8 +99,13 @@ public class SimFlopsWorkload implements SimWorkload, FlowStageLogic { } @Override - public String toString() { - return "SimFlopsWorkload[FLOPs=" + flops + ",utilization=" + utilization + "]"; + public SimFlopsWorkload snapshot() { + final FlowStage stage = this.stage; + if (stage != null) { + stage.sync(); + } + + return new SimFlopsWorkload((long) remainingAmount, utilization); } @Override @@ -138,4 +144,9 @@ public class SimFlopsWorkload implements SimWorkload, FlowStageLogic { return now + duration; } + + @Override + public String toString() { + return "SimFlopsWorkload[FLOPs=" + flops + ",utilization=" + utilization + "]"; + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java index 194efafd..9c9f4788 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java @@ -59,6 +59,7 @@ public class SimRuntimeWorkload implements SimWorkload, FlowStageLogic { this.duration = duration; this.utilization = utilization; + this.remainingDuration = duration; } @Override @@ -98,6 +99,16 @@ public class SimRuntimeWorkload implements SimWorkload, FlowStageLogic { } @Override + public SimRuntimeWorkload snapshot() { + final FlowStage stage = this.stage; + if (stage != null) { + stage.sync(); + } + + return new SimRuntimeWorkload(remainingDuration, utilization); + } + + @Override public long onUpdate(FlowStage ctx, long now) { long lastUpdate = this.lastUpdate; this.lastUpdate = now; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTrace.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTrace.java index 12a567ff..1d8667d5 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTrace.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTrace.java @@ -71,7 +71,7 @@ public final class SimTrace { * @param offset The offset for the timestamps. */ public SimWorkload createWorkload(long offset) { - return new Workload(offset, usageCol, deadlineCol, coresCol, size); + return new Workload(offset, usageCol, deadlineCol, coresCol, size, 0); } /** @@ -211,22 +211,24 @@ public final class SimTrace { private final long[] deadlineCol; private final int[] coresCol; private final int size; + private final int index; - private Workload(long offset, double[] usageCol, long[] deadlineCol, int[] coresCol, int size) { + private Workload(long offset, double[] usageCol, long[] deadlineCol, int[] coresCol, int size, int index) { this.offset = offset; this.usageCol = usageCol; this.deadlineCol = deadlineCol; this.coresCol = coresCol; this.size = size; + this.index = index; } @Override public void onStart(SimMachineContext ctx) { final WorkloadStageLogic logic; if (ctx.getCpus().size() == 1) { - logic = new SingleWorkloadLogic(ctx, offset, usageCol, deadlineCol, size); + logic = new SingleWorkloadLogic(ctx, offset, usageCol, deadlineCol, size, index); } else { - logic = new MultiWorkloadLogic(ctx, offset, usageCol, deadlineCol, coresCol, size); + logic = new MultiWorkloadLogic(ctx, offset, usageCol, deadlineCol, coresCol, size, index); } this.logic = logic; } @@ -240,6 +242,18 @@ public final class SimTrace { logic.getStage().close(); } } + + @Override + public SimWorkload snapshot() { + final WorkloadStageLogic logic = this.logic; + int index = this.index; + + if (logic != null) { + index = logic.getIndex(); + } + + return new Workload(offset, usageCol, deadlineCol, coresCol, size, index); + } } /** @@ -250,6 +264,11 @@ public final class SimTrace { * Return the {@link FlowStage} belonging to this instance. */ FlowStage getStage(); + + /** + * Return the current index of the workload. + */ + int getIndex(); } /** @@ -268,12 +287,13 @@ public final class SimTrace { private final SimMachineContext ctx; private SingleWorkloadLogic( - SimMachineContext ctx, long offset, double[] usageCol, long[] deadlineCol, int size) { + SimMachineContext ctx, long offset, double[] usageCol, long[] deadlineCol, int size, int index) { this.ctx = ctx; this.offset = offset; this.usageCol = usageCol; this.deadlineCol = deadlineCol; this.size = size; + this.index = index; final FlowGraph graph = ctx.getGraph(); final List<? extends SimProcessingUnit> cpus = ctx.getCpus(); @@ -315,6 +335,11 @@ public final class SimTrace { return stage; } + @Override + public int getIndex() { + return index; + } + /** * Helper method to stop the execution of the workload. */ @@ -346,13 +371,20 @@ public final class SimTrace { private final SimMachineContext ctx; private MultiWorkloadLogic( - SimMachineContext ctx, long offset, double[] usageCol, long[] deadlineCol, int[] coresCol, int size) { + SimMachineContext ctx, + long offset, + double[] usageCol, + long[] deadlineCol, + int[] coresCol, + int size, + int index) { this.ctx = ctx; this.offset = offset; this.usageCol = usageCol; this.deadlineCol = deadlineCol; this.coresCol = coresCol; this.size = size; + this.index = index; final FlowGraph graph = ctx.getGraph(); final List<? extends SimProcessingUnit> cpus = ctx.getCpus(); @@ -418,5 +450,10 @@ public final class SimTrace { public FlowStage getStage() { return stage; } + + @Override + public int getIndex() { + return index; + } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java index 7be51265..cad324fb 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java @@ -45,4 +45,9 @@ public interface SimWorkload { * @param ctx The execution context in which the machine runs. */ void onStop(SimMachineContext ctx); + + /** + * Create a snapshot of this workload. + */ + SimWorkload snapshot(); } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index 266839bd..2acf6ec7 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.compute +import io.mockk.every +import io.mockk.mockk import kotlinx.coroutines.CancellationException import kotlinx.coroutines.cancel import kotlinx.coroutines.coroutineScope @@ -175,6 +177,8 @@ class SimMachineTest { } override fun onStop(ctx: SimMachineContext) {} + + override fun snapshot(): SimWorkload = TODO() }) } @@ -195,6 +199,8 @@ class SimMachineTest { } override fun onStop(ctx: SimMachineContext) {} + + override fun snapshot(): SimWorkload = TODO() }) } @@ -215,6 +221,8 @@ class SimMachineTest { } override fun onStop(ctx: SimMachineContext) {} + + override fun snapshot(): SimWorkload = TODO() }) assertEquals(1000, clock.millis()) @@ -241,6 +249,8 @@ class SimMachineTest { } override fun onStop(ctx: SimMachineContext) {} + + override fun snapshot(): SimWorkload = TODO() }) assertEquals(40, clock.millis()) @@ -264,6 +274,8 @@ class SimMachineTest { } override fun onStop(ctx: SimMachineContext) {} + + override fun snapshot(): SimWorkload = TODO() }) assertEquals(4000, clock.millis()) @@ -287,6 +299,8 @@ class SimMachineTest { } override fun onStop(ctx: SimMachineContext) {} + + override fun snapshot(): SimWorkload = TODO() }) assertEquals(4000, clock.millis()) @@ -334,4 +348,71 @@ class SimMachineTest { } } } + + @Test + fun testCatchStartFailure() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workload = mockk<SimWorkload>() + every { workload.onStart(any()) } throws IllegalStateException() + + assertThrows<IllegalStateException> { machine.runWorkload(workload) } + } + + @Test + fun testCatchStopFailure() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workload = mockk<SimWorkload>() + every { workload.onStart(any()) } answers { (it.invocation.args[0] as SimMachineContext).shutdown() } + every { workload.onStop(any()) } throws IllegalStateException() + + assertThrows<IllegalStateException> { machine.runWorkload(workload) } + } + + @Test + fun testCatchShutdownFailure() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workload = mockk<SimWorkload>() + every { workload.onStart(any()) } answers { (it.invocation.args[0] as SimMachineContext).shutdown(IllegalStateException()) } + + assertThrows<IllegalStateException> { machine.runWorkload(workload) } + } + + @Test + fun testCatchNestedFailure() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workload = mockk<SimWorkload>() + every { workload.onStart(any()) } answers { (it.invocation.args[0] as SimMachineContext).shutdown(IllegalStateException()) } + every { workload.onStop(any()) } throws IllegalStateException() + + val exc = assertThrows<IllegalStateException> { machine.runWorkload(workload) } + assertEquals(1, exc.cause!!.suppressedExceptions.size) + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt index 6bf05f65..d0b0efaa 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt @@ -25,10 +25,14 @@ package org.opendc.simulator.compute.workload import io.mockk.every import io.mockk.mockk import io.mockk.spyk +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows import org.opendc.simulator.compute.SimBareMetalMachine +import org.opendc.simulator.compute.SimMachineContext import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -65,8 +69,8 @@ class SimChainWorkloadTest { val workload = SimWorkloads.chain( - SimRuntimeWorkload(1000, 1.0), - SimRuntimeWorkload(1000, 1.0) + SimWorkloads.runtime(1000, 1.0), + SimWorkloads.runtime(1000, 1.0) ) machine.runWorkload(workload) @@ -91,10 +95,10 @@ class SimChainWorkloadTest { val workload = SimWorkloads.chain( workloadA, - SimRuntimeWorkload(1000, 1.0) + SimWorkloads.runtime(1000, 1.0) ) - machine.runWorkload(workload) + assertThrows<IllegalStateException> { machine.runWorkload(workload) } assertEquals(0, clock.millis()) } @@ -115,12 +119,12 @@ class SimChainWorkloadTest { val workload = SimWorkloads.chain( - SimRuntimeWorkload(1000, 1.0), + SimWorkloads.runtime(1000, 1.0), workloadA, - SimRuntimeWorkload(1000, 1.0) + SimWorkloads.runtime(1000, 1.0) ) - machine.runWorkload(workload) + assertThrows<IllegalStateException> { machine.runWorkload(workload) } assertEquals(1000, clock.millis()) } @@ -141,10 +145,10 @@ class SimChainWorkloadTest { val workload = SimWorkloads.chain( workloadA, - SimRuntimeWorkload(1000, 1.0) + SimWorkloads.runtime(1000, 1.0) ) - machine.runWorkload(workload) + assertThrows<IllegalStateException> { machine.runWorkload(workload) } assertEquals(1000, clock.millis()) } @@ -164,13 +168,118 @@ class SimChainWorkloadTest { val workload = SimWorkloads.chain( + SimWorkloads.runtime(1000, 1.0), + workloadA, + SimWorkloads.runtime(1000, 1.0) + ) + + assertThrows<IllegalStateException> { machine.runWorkload(workload) } + + assertEquals(2000, clock.millis()) + } + + @Test + fun testStartAndStopFailure() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workloadA = mockk<SimWorkload>() + every { workloadA.onStart(any()) } throws IllegalStateException() + every { workloadA.onStop(any()) } throws IllegalStateException() + + val workload = + SimWorkloads.chain( + SimRuntimeWorkload(1000, 1.0), + workloadA + ) + + val exc = assertThrows<IllegalStateException> { machine.runWorkload(workload) } + + assertEquals(2, exc.cause!!.suppressedExceptions.size) + assertEquals(1000, clock.millis()) + } + + @Test + fun testShutdownAndStopFailure() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workloadA = mockk<SimWorkload>() + every { workloadA.onStart(any()) } answers { (it.invocation.args[0] as SimMachineContext).shutdown(IllegalStateException()) } + every { workloadA.onStop(any()) } throws IllegalStateException() + + val workload = + SimWorkloads.chain( + SimRuntimeWorkload(1000, 1.0), + workloadA + ) + + val exc = assertThrows<IllegalStateException> { machine.runWorkload(workload) } + + assertEquals(1, exc.cause!!.suppressedExceptions.size) + assertEquals(1000, clock.millis()) + } + + @Test + fun testShutdownAndStartFailure() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workloadA = mockk<SimWorkload>(relaxUnitFun = true) + every { workloadA.onStart(any()) } answers { (it.invocation.args[0] as SimMachineContext).shutdown(IllegalStateException()) } + + val workloadB = mockk<SimWorkload>(relaxUnitFun = true) + every { workloadB.onStart(any()) } throws IllegalStateException() + + val workload = + SimWorkloads.chain( SimRuntimeWorkload(1000, 1.0), workloadA, - SimRuntimeWorkload(1000, 1.0) + workloadB ) - machine.runWorkload(workload) + val exc = assertThrows<IllegalStateException> { machine.runWorkload(workload) } + assertEquals(1, exc.cause!!.suppressedExceptions.size) + assertEquals(1000, clock.millis()) + } + + @Test + fun testSnapshot() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create(graph, machineModel) + val workload = + SimWorkloads.chain( + SimWorkloads.runtime(1000, 1.0), + SimWorkloads.runtime(1000, 1.0) + ) + + val job = launch { machine.runWorkload(workload) } + delay(500L) + val snapshot = workload.snapshot() + + job.join() assertEquals(2000, clock.millis()) + + machine.runWorkload(snapshot) + + assertEquals(3500, clock.millis()) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java index 4d098043..ed5579ea 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java @@ -152,6 +152,15 @@ public final class FlowStage { } /** + * Synchronously update the {@link FlowStage} at the current timestamp. + */ + public void sync() { + this.flags |= STAGE_INVALIDATE; + onUpdate(clock.millis()); + engine.scheduleDelayed(this); + } + + /** * Close the {@link FlowStage} and disconnect all inlets and outlets. */ public void close() { |
