summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-31 19:36:45 +0100
committerGitHub <noreply@github.com>2022-10-31 19:36:45 +0100
commit9d06fb04319a50c26599f2da5387a2d03bee15ec (patch)
tree1dd498da02298c9924f6c37974f4311df6948cb2
parentb96acc687f59b698fbc4d4c984d77b008cd4051b (diff)
parentc9750e52a10157f3838b934fed4f04fae69c539a (diff)
merge: Support snapshotting simulated workloads (#113)
This pull request adds support for snapshotting simulated workloads in OpenDC, which serves as the basis for virtual machine migration/suspension support. Part of #32 ## Implementation Notes :hammer_and_pick: * Support synchronous update of FlowStage * Report exceptions in onStop as suppressed * Add support for snapshotting workloads
-rw-r--r--opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt6
-rw-r--r--opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt2
-rw-r--r--opendc-simulator/opendc-simulator-compute/build.gradle.kts1
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java29
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java17
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java5
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java99
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java15
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java11
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTrace.java49
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java5
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt81
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt131
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java9
14 files changed, 398 insertions, 62 deletions
diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index b3e56f38..5eccc6ec 100644
--- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -280,7 +280,7 @@ public class SimHost(
val bootWorkload = bootModel.get()
val hypervisor = hypervisor
- val hypervisorWorkload = object : SimWorkload {
+ val hypervisorWorkload = object : SimWorkload by hypervisor {
override fun onStart(ctx: SimMachineContext) {
try {
_bootTime = clock.instant()
@@ -296,10 +296,6 @@ public class SimHost(
throw cause
}
}
-
- override fun onStop(ctx: SimMachineContext) {
- hypervisor.onStop(ctx)
- }
}
val workload = if (bootWorkload != null) SimWorkloads.chain(bootWorkload, hypervisorWorkload) else hypervisorWorkload
diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
index a7fc102a..eb308970 100644
--- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
+++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt
@@ -127,6 +127,8 @@ public class SimTFDevice(
output = null
}
+ override fun snapshot(): SimWorkload = throw UnsupportedOperationException()
+
override fun onUpdate(ctx: FlowStage, now: Long): Long {
val output = output ?: return Long.MAX_VALUE
val lastPull = lastPull
diff --git a/opendc-simulator/opendc-simulator-compute/build.gradle.kts b/opendc-simulator/opendc-simulator-compute/build.gradle.kts
index f3f90bb6..0ea0c252 100644
--- a/opendc-simulator/opendc-simulator-compute/build.gradle.kts
+++ b/opendc-simulator/opendc-simulator-compute/build.gradle.kts
@@ -32,7 +32,6 @@ dependencies {
api(projects.opendcSimulator.opendcSimulatorPower)
api(projects.opendcSimulator.opendcSimulatorNetwork)
implementation(projects.opendcSimulator.opendcSimulatorCore)
- implementation(libs.kotlin.logging)
testImplementation(libs.slf4j.simple)
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java
index d968d884..f684c54d 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java
@@ -36,14 +36,11 @@ import org.opendc.simulator.flow2.Outlet;
import org.opendc.simulator.flow2.sink.SimpleFlowSink;
import org.opendc.simulator.flow2.util.FlowTransformer;
import org.opendc.simulator.flow2.util.FlowTransforms;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Abstract implementation of the {@link SimMachine} interface.
*/
public abstract class SimAbstractMachine implements SimMachine {
- private static final Logger LOGGER = LoggerFactory.getLogger(SimAbstractMachine.class);
private final MachineModel model;
private Context activeContext;
@@ -108,7 +105,6 @@ public abstract class SimAbstractMachine implements SimMachine {
private final Map<String, Object> meta;
private final Consumer<Exception> completion;
private boolean isClosed;
- private Exception cause;
/**
* Construct a new {@link Context} instance.
@@ -135,6 +131,11 @@ public abstract class SimAbstractMachine implements SimMachine {
}
@Override
+ public SimWorkload snapshot() {
+ return workload.snapshot();
+ }
+
+ @Override
public void reset() {
final FlowGraph graph = getMemory().getInput().getGraph();
@@ -158,6 +159,11 @@ public abstract class SimAbstractMachine implements SimMachine {
@Override
public final void shutdown() {
+ shutdown(null);
+ }
+
+ @Override
+ public final void shutdown(Exception cause) {
if (isClosed) {
return;
}
@@ -170,19 +176,17 @@ public abstract class SimAbstractMachine implements SimMachine {
// Cancel all the resources associated with the machine
doCancel();
- Exception e = this.cause;
-
try {
workload.onStop(this);
- } catch (Exception cause) {
- if (e != null) {
- e.addSuppressed(cause);
+ } catch (Exception e) {
+ if (cause == null) {
+ cause = e;
} else {
- e = cause;
+ cause.addSuppressed(e);
}
}
- completion.accept(e);
+ completion.accept(cause);
}
/**
@@ -193,8 +197,7 @@ public abstract class SimAbstractMachine implements SimMachine {
machine.activeContext = this;
workload.onStart(this);
} catch (Exception cause) {
- this.cause = cause;
- shutdown();
+ shutdown(cause);
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java
index 5d08e2b7..bce5c0a8 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java
@@ -24,6 +24,7 @@ package org.opendc.simulator.compute;
import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
import org.opendc.simulator.compute.workload.SimWorkload;
import org.opendc.simulator.flow2.FlowGraph;
@@ -43,7 +44,7 @@ public interface SimMachineContext {
/**
* Return the metadata associated with the context.
* <p>
- * Users can pass this metadata to the workload via {@link SimMachine#startWorkload(SimWorkload, Map)}.
+ * Users can pass this metadata to the workload via {@link SimMachine#startWorkload(SimWorkload, Map, Consumer)}.
*/
Map<String, Object> getMeta();
@@ -68,6 +69,13 @@ public interface SimMachineContext {
List<? extends SimStorageInterface> getStorageInterfaces();
/**
+ * Create a snapshot of the {@link SimWorkload} running on this machine.
+ *
+ * @throws UnsupportedOperationException if the workload does not support snapshotting.
+ */
+ SimWorkload snapshot();
+
+ /**
* Reset all resources of the machine.
*/
void reset();
@@ -76,4 +84,11 @@ public interface SimMachineContext {
* Shutdown the workload.
*/
void shutdown();
+
+ /**
+ * Shutdown the workload due to failure.
+ *
+ * @param cause The cause for shutting down the workload.
+ */
+ void shutdown(Exception cause);
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java
index f03a0c20..4ebcba71 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java
@@ -240,6 +240,11 @@ public final class SimHypervisor implements SimWorkload {
}
}
+ @Override
+ public SimWorkload snapshot() {
+ throw new UnsupportedOperationException("Unable to snapshot hypervisor");
+ }
+
/**
* The context which carries the state when the hypervisor is running on a machine.
*/
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java
index 9304122a..7480b3c0 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java
@@ -30,15 +30,11 @@ import org.opendc.simulator.compute.SimNetworkInterface;
import org.opendc.simulator.compute.SimProcessingUnit;
import org.opendc.simulator.compute.SimStorageInterface;
import org.opendc.simulator.flow2.FlowGraph;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* A {@link SimWorkload} that composes two {@link SimWorkload}s.
*/
final class SimChainWorkload implements SimWorkload {
- private static final Logger LOGGER = LoggerFactory.getLogger(SimChainWorkload.class);
-
private final SimWorkload[] workloads;
private int activeWorkloadIndex;
@@ -48,9 +44,20 @@ final class SimChainWorkload implements SimWorkload {
* Construct a {@link SimChainWorkload} instance.
*
* @param workloads The workloads to chain.
+ * @param activeWorkloadIndex The index of the active workload.
*/
- SimChainWorkload(SimWorkload... workloads) {
+ SimChainWorkload(SimWorkload[] workloads, int activeWorkloadIndex) {
this.workloads = workloads;
+ this.activeWorkloadIndex = activeWorkloadIndex;
+ }
+
+ /**
+ * Construct a {@link SimChainWorkload} instance.
+ *
+ * @param workloads The workloads to chain.
+ */
+ SimChainWorkload(SimWorkload... workloads) {
+ this(workloads, 0);
}
@Override
@@ -65,9 +72,7 @@ final class SimChainWorkload implements SimWorkload {
final Context context = new Context(ctx);
activeContext = context;
- if (!context.doStart(workloads[activeWorkloadIndex])) {
- ctx.shutdown();
- }
+ tryThrow(context.doStart(workloads[activeWorkloadIndex]));
}
@Override
@@ -82,7 +87,20 @@ final class SimChainWorkload implements SimWorkload {
final Context context = activeContext;
activeContext = null;
- context.doStop(workloads[activeWorkloadIndex]);
+ tryThrow(context.doStop(workloads[activeWorkloadIndex]));
+ }
+
+ @Override
+ public SimChainWorkload snapshot() {
+ final int activeWorkloadIndex = this.activeWorkloadIndex;
+ final SimWorkload[] workloads = this.workloads;
+ final SimWorkload[] newWorkloads = new SimWorkload[workloads.length - activeWorkloadIndex];
+
+ for (int i = 0; i < newWorkloads.length; i++) {
+ newWorkloads[i] = workloads[activeWorkloadIndex + i].snapshot();
+ }
+
+ return new SimChainWorkload(newWorkloads, 0);
}
/**
@@ -126,57 +144,92 @@ final class SimChainWorkload implements SimWorkload {
}
@Override
+ public SimWorkload snapshot() {
+ final SimWorkload workload = workloads[activeWorkloadIndex];
+ return workload.snapshot();
+ }
+
+ @Override
public void reset() {
ctx.reset();
}
@Override
public void shutdown() {
+ shutdown(null);
+ }
+
+ @Override
+ public void shutdown(Exception cause) {
final SimWorkload[] workloads = SimChainWorkload.this.workloads;
final int activeWorkloadIndex = ++SimChainWorkload.this.activeWorkloadIndex;
- if (doStop(workloads[activeWorkloadIndex - 1]) && activeWorkloadIndex < workloads.length) {
+ final Exception stopException = doStop(workloads[activeWorkloadIndex - 1]);
+ if (cause == null) {
+ cause = stopException;
+ } else if (stopException != null) {
+ cause.addSuppressed(stopException);
+ }
+
+ if (stopException == null && activeWorkloadIndex < workloads.length) {
ctx.reset();
- if (doStart(workloads[activeWorkloadIndex])) {
+ final Exception startException = doStart(workloads[activeWorkloadIndex]);
+
+ if (startException == null) {
return;
+ } else if (cause == null) {
+ cause = startException;
+ } else {
+ cause.addSuppressed(startException);
}
}
- ctx.shutdown();
+ ctx.shutdown(cause);
}
/**
* Start the specified workload.
*
- * @return <code>true</code> if the workload started successfully, <code>false</code> otherwise.
+ * @return The {@link Exception} that occurred while starting the workload or <code>null</code> if the workload
+ * started successfully.
*/
- private boolean doStart(SimWorkload workload) {
+ private Exception doStart(SimWorkload workload) {
try {
workload.onStart(this);
} catch (Exception cause) {
- LOGGER.warn("Workload failed during onStart callback", cause);
- doStop(workload);
- return false;
+ final Exception stopException = doStop(workload);
+ if (stopException != null) {
+ cause.addSuppressed(stopException);
+ }
+ return cause;
}
- return true;
+ return null;
}
/**
* Stop the specified workload.
*
- * @return <code>true</code> if the workload stopped successfully, <code>false</code> otherwise.
+ * @return The {@link Exception} that occurred while stopping the workload or <code>null</code> if the workload
+ * stopped successfully.
*/
- private boolean doStop(SimWorkload workload) {
+ private Exception doStop(SimWorkload workload) {
try {
workload.onStop(this);
} catch (Exception cause) {
- LOGGER.warn("Workload failed during onStop callback", cause);
- return false;
+ return cause;
}
- return true;
+ return null;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T extends Throwable> void tryThrow(Throwable e) throws T {
+ if (e == null) {
+ return;
}
+ throw (T) e;
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java
index f3efbebb..839856bb 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java
@@ -60,6 +60,7 @@ public class SimFlopsWorkload implements SimWorkload, FlowStageLogic {
this.flops = flops;
this.utilization = utilization;
+ this.remainingAmount = flops;
}
@Override
@@ -98,8 +99,13 @@ public class SimFlopsWorkload implements SimWorkload, FlowStageLogic {
}
@Override
- public String toString() {
- return "SimFlopsWorkload[FLOPs=" + flops + ",utilization=" + utilization + "]";
+ public SimFlopsWorkload snapshot() {
+ final FlowStage stage = this.stage;
+ if (stage != null) {
+ stage.sync();
+ }
+
+ return new SimFlopsWorkload((long) remainingAmount, utilization);
}
@Override
@@ -138,4 +144,9 @@ public class SimFlopsWorkload implements SimWorkload, FlowStageLogic {
return now + duration;
}
+
+ @Override
+ public String toString() {
+ return "SimFlopsWorkload[FLOPs=" + flops + ",utilization=" + utilization + "]";
+ }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java
index 194efafd..9c9f4788 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java
@@ -59,6 +59,7 @@ public class SimRuntimeWorkload implements SimWorkload, FlowStageLogic {
this.duration = duration;
this.utilization = utilization;
+ this.remainingDuration = duration;
}
@Override
@@ -98,6 +99,16 @@ public class SimRuntimeWorkload implements SimWorkload, FlowStageLogic {
}
@Override
+ public SimRuntimeWorkload snapshot() {
+ final FlowStage stage = this.stage;
+ if (stage != null) {
+ stage.sync();
+ }
+
+ return new SimRuntimeWorkload(remainingDuration, utilization);
+ }
+
+ @Override
public long onUpdate(FlowStage ctx, long now) {
long lastUpdate = this.lastUpdate;
this.lastUpdate = now;
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTrace.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTrace.java
index 12a567ff..1d8667d5 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTrace.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTrace.java
@@ -71,7 +71,7 @@ public final class SimTrace {
* @param offset The offset for the timestamps.
*/
public SimWorkload createWorkload(long offset) {
- return new Workload(offset, usageCol, deadlineCol, coresCol, size);
+ return new Workload(offset, usageCol, deadlineCol, coresCol, size, 0);
}
/**
@@ -211,22 +211,24 @@ public final class SimTrace {
private final long[] deadlineCol;
private final int[] coresCol;
private final int size;
+ private final int index;
- private Workload(long offset, double[] usageCol, long[] deadlineCol, int[] coresCol, int size) {
+ private Workload(long offset, double[] usageCol, long[] deadlineCol, int[] coresCol, int size, int index) {
this.offset = offset;
this.usageCol = usageCol;
this.deadlineCol = deadlineCol;
this.coresCol = coresCol;
this.size = size;
+ this.index = index;
}
@Override
public void onStart(SimMachineContext ctx) {
final WorkloadStageLogic logic;
if (ctx.getCpus().size() == 1) {
- logic = new SingleWorkloadLogic(ctx, offset, usageCol, deadlineCol, size);
+ logic = new SingleWorkloadLogic(ctx, offset, usageCol, deadlineCol, size, index);
} else {
- logic = new MultiWorkloadLogic(ctx, offset, usageCol, deadlineCol, coresCol, size);
+ logic = new MultiWorkloadLogic(ctx, offset, usageCol, deadlineCol, coresCol, size, index);
}
this.logic = logic;
}
@@ -240,6 +242,18 @@ public final class SimTrace {
logic.getStage().close();
}
}
+
+ @Override
+ public SimWorkload snapshot() {
+ final WorkloadStageLogic logic = this.logic;
+ int index = this.index;
+
+ if (logic != null) {
+ index = logic.getIndex();
+ }
+
+ return new Workload(offset, usageCol, deadlineCol, coresCol, size, index);
+ }
}
/**
@@ -250,6 +264,11 @@ public final class SimTrace {
* Return the {@link FlowStage} belonging to this instance.
*/
FlowStage getStage();
+
+ /**
+ * Return the current index of the workload.
+ */
+ int getIndex();
}
/**
@@ -268,12 +287,13 @@ public final class SimTrace {
private final SimMachineContext ctx;
private SingleWorkloadLogic(
- SimMachineContext ctx, long offset, double[] usageCol, long[] deadlineCol, int size) {
+ SimMachineContext ctx, long offset, double[] usageCol, long[] deadlineCol, int size, int index) {
this.ctx = ctx;
this.offset = offset;
this.usageCol = usageCol;
this.deadlineCol = deadlineCol;
this.size = size;
+ this.index = index;
final FlowGraph graph = ctx.getGraph();
final List<? extends SimProcessingUnit> cpus = ctx.getCpus();
@@ -315,6 +335,11 @@ public final class SimTrace {
return stage;
}
+ @Override
+ public int getIndex() {
+ return index;
+ }
+
/**
* Helper method to stop the execution of the workload.
*/
@@ -346,13 +371,20 @@ public final class SimTrace {
private final SimMachineContext ctx;
private MultiWorkloadLogic(
- SimMachineContext ctx, long offset, double[] usageCol, long[] deadlineCol, int[] coresCol, int size) {
+ SimMachineContext ctx,
+ long offset,
+ double[] usageCol,
+ long[] deadlineCol,
+ int[] coresCol,
+ int size,
+ int index) {
this.ctx = ctx;
this.offset = offset;
this.usageCol = usageCol;
this.deadlineCol = deadlineCol;
this.coresCol = coresCol;
this.size = size;
+ this.index = index;
final FlowGraph graph = ctx.getGraph();
final List<? extends SimProcessingUnit> cpus = ctx.getCpus();
@@ -418,5 +450,10 @@ public final class SimTrace {
public FlowStage getStage() {
return stage;
}
+
+ @Override
+ public int getIndex() {
+ return index;
+ }
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java
index 7be51265..cad324fb 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java
@@ -45,4 +45,9 @@ public interface SimWorkload {
* @param ctx The execution context in which the machine runs.
*/
void onStop(SimMachineContext ctx);
+
+ /**
+ * Create a snapshot of this workload.
+ */
+ SimWorkload snapshot();
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
index 266839bd..2acf6ec7 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt
@@ -22,6 +22,8 @@
package org.opendc.simulator.compute
+import io.mockk.every
+import io.mockk.mockk
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.cancel
import kotlinx.coroutines.coroutineScope
@@ -175,6 +177,8 @@ class SimMachineTest {
}
override fun onStop(ctx: SimMachineContext) {}
+
+ override fun snapshot(): SimWorkload = TODO()
})
}
@@ -195,6 +199,8 @@ class SimMachineTest {
}
override fun onStop(ctx: SimMachineContext) {}
+
+ override fun snapshot(): SimWorkload = TODO()
})
}
@@ -215,6 +221,8 @@ class SimMachineTest {
}
override fun onStop(ctx: SimMachineContext) {}
+
+ override fun snapshot(): SimWorkload = TODO()
})
assertEquals(1000, clock.millis())
@@ -241,6 +249,8 @@ class SimMachineTest {
}
override fun onStop(ctx: SimMachineContext) {}
+
+ override fun snapshot(): SimWorkload = TODO()
})
assertEquals(40, clock.millis())
@@ -264,6 +274,8 @@ class SimMachineTest {
}
override fun onStop(ctx: SimMachineContext) {}
+
+ override fun snapshot(): SimWorkload = TODO()
})
assertEquals(4000, clock.millis())
@@ -287,6 +299,8 @@ class SimMachineTest {
}
override fun onStop(ctx: SimMachineContext) {}
+
+ override fun snapshot(): SimWorkload = TODO()
})
assertEquals(4000, clock.millis())
@@ -334,4 +348,71 @@ class SimMachineTest {
}
}
}
+
+ @Test
+ fun testCatchStartFailure() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val machine = SimBareMetalMachine.create(
+ graph,
+ machineModel
+ )
+
+ val workload = mockk<SimWorkload>()
+ every { workload.onStart(any()) } throws IllegalStateException()
+
+ assertThrows<IllegalStateException> { machine.runWorkload(workload) }
+ }
+
+ @Test
+ fun testCatchStopFailure() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val machine = SimBareMetalMachine.create(
+ graph,
+ machineModel
+ )
+
+ val workload = mockk<SimWorkload>()
+ every { workload.onStart(any()) } answers { (it.invocation.args[0] as SimMachineContext).shutdown() }
+ every { workload.onStop(any()) } throws IllegalStateException()
+
+ assertThrows<IllegalStateException> { machine.runWorkload(workload) }
+ }
+
+ @Test
+ fun testCatchShutdownFailure() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val machine = SimBareMetalMachine.create(
+ graph,
+ machineModel
+ )
+
+ val workload = mockk<SimWorkload>()
+ every { workload.onStart(any()) } answers { (it.invocation.args[0] as SimMachineContext).shutdown(IllegalStateException()) }
+
+ assertThrows<IllegalStateException> { machine.runWorkload(workload) }
+ }
+
+ @Test
+ fun testCatchNestedFailure() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val machine = SimBareMetalMachine.create(
+ graph,
+ machineModel
+ )
+
+ val workload = mockk<SimWorkload>()
+ every { workload.onStart(any()) } answers { (it.invocation.args[0] as SimMachineContext).shutdown(IllegalStateException()) }
+ every { workload.onStop(any()) } throws IllegalStateException()
+
+ val exc = assertThrows<IllegalStateException> { machine.runWorkload(workload) }
+ assertEquals(1, exc.cause!!.suppressedExceptions.size)
+ }
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt
index 6bf05f65..d0b0efaa 100644
--- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt
@@ -25,10 +25,14 @@ package org.opendc.simulator.compute.workload
import io.mockk.every
import io.mockk.mockk
import io.mockk.spyk
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.compute.SimBareMetalMachine
+import org.opendc.simulator.compute.SimMachineContext
import org.opendc.simulator.compute.model.MachineModel
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingNode
@@ -65,8 +69,8 @@ class SimChainWorkloadTest {
val workload =
SimWorkloads.chain(
- SimRuntimeWorkload(1000, 1.0),
- SimRuntimeWorkload(1000, 1.0)
+ SimWorkloads.runtime(1000, 1.0),
+ SimWorkloads.runtime(1000, 1.0)
)
machine.runWorkload(workload)
@@ -91,10 +95,10 @@ class SimChainWorkloadTest {
val workload =
SimWorkloads.chain(
workloadA,
- SimRuntimeWorkload(1000, 1.0)
+ SimWorkloads.runtime(1000, 1.0)
)
- machine.runWorkload(workload)
+ assertThrows<IllegalStateException> { machine.runWorkload(workload) }
assertEquals(0, clock.millis())
}
@@ -115,12 +119,12 @@ class SimChainWorkloadTest {
val workload =
SimWorkloads.chain(
- SimRuntimeWorkload(1000, 1.0),
+ SimWorkloads.runtime(1000, 1.0),
workloadA,
- SimRuntimeWorkload(1000, 1.0)
+ SimWorkloads.runtime(1000, 1.0)
)
- machine.runWorkload(workload)
+ assertThrows<IllegalStateException> { machine.runWorkload(workload) }
assertEquals(1000, clock.millis())
}
@@ -141,10 +145,10 @@ class SimChainWorkloadTest {
val workload =
SimWorkloads.chain(
workloadA,
- SimRuntimeWorkload(1000, 1.0)
+ SimWorkloads.runtime(1000, 1.0)
)
- machine.runWorkload(workload)
+ assertThrows<IllegalStateException> { machine.runWorkload(workload) }
assertEquals(1000, clock.millis())
}
@@ -164,13 +168,118 @@ class SimChainWorkloadTest {
val workload =
SimWorkloads.chain(
+ SimWorkloads.runtime(1000, 1.0),
+ workloadA,
+ SimWorkloads.runtime(1000, 1.0)
+ )
+
+ assertThrows<IllegalStateException> { machine.runWorkload(workload) }
+
+ assertEquals(2000, clock.millis())
+ }
+
+ @Test
+ fun testStartAndStopFailure() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val machine = SimBareMetalMachine.create(
+ graph,
+ machineModel
+ )
+
+ val workloadA = mockk<SimWorkload>()
+ every { workloadA.onStart(any()) } throws IllegalStateException()
+ every { workloadA.onStop(any()) } throws IllegalStateException()
+
+ val workload =
+ SimWorkloads.chain(
+ SimRuntimeWorkload(1000, 1.0),
+ workloadA
+ )
+
+ val exc = assertThrows<IllegalStateException> { machine.runWorkload(workload) }
+
+ assertEquals(2, exc.cause!!.suppressedExceptions.size)
+ assertEquals(1000, clock.millis())
+ }
+
+ @Test
+ fun testShutdownAndStopFailure() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val machine = SimBareMetalMachine.create(
+ graph,
+ machineModel
+ )
+
+ val workloadA = mockk<SimWorkload>()
+ every { workloadA.onStart(any()) } answers { (it.invocation.args[0] as SimMachineContext).shutdown(IllegalStateException()) }
+ every { workloadA.onStop(any()) } throws IllegalStateException()
+
+ val workload =
+ SimWorkloads.chain(
+ SimRuntimeWorkload(1000, 1.0),
+ workloadA
+ )
+
+ val exc = assertThrows<IllegalStateException> { machine.runWorkload(workload) }
+
+ assertEquals(1, exc.cause!!.suppressedExceptions.size)
+ assertEquals(1000, clock.millis())
+ }
+
+ @Test
+ fun testShutdownAndStartFailure() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val machine = SimBareMetalMachine.create(
+ graph,
+ machineModel
+ )
+
+ val workloadA = mockk<SimWorkload>(relaxUnitFun = true)
+ every { workloadA.onStart(any()) } answers { (it.invocation.args[0] as SimMachineContext).shutdown(IllegalStateException()) }
+
+ val workloadB = mockk<SimWorkload>(relaxUnitFun = true)
+ every { workloadB.onStart(any()) } throws IllegalStateException()
+
+ val workload =
+ SimWorkloads.chain(
SimRuntimeWorkload(1000, 1.0),
workloadA,
- SimRuntimeWorkload(1000, 1.0)
+ workloadB
)
- machine.runWorkload(workload)
+ val exc = assertThrows<IllegalStateException> { machine.runWorkload(workload) }
+ assertEquals(1, exc.cause!!.suppressedExceptions.size)
+ assertEquals(1000, clock.millis())
+ }
+
+ @Test
+ fun testSnapshot() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val machine = SimBareMetalMachine.create(graph, machineModel)
+ val workload =
+ SimWorkloads.chain(
+ SimWorkloads.runtime(1000, 1.0),
+ SimWorkloads.runtime(1000, 1.0)
+ )
+
+ val job = launch { machine.runWorkload(workload) }
+ delay(500L)
+ val snapshot = workload.snapshot()
+
+ job.join()
assertEquals(2000, clock.millis())
+
+ machine.runWorkload(snapshot)
+
+ assertEquals(3500, clock.millis())
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java
index 4d098043..ed5579ea 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java
@@ -152,6 +152,15 @@ public final class FlowStage {
}
/**
+ * Synchronously update the {@link FlowStage} at the current timestamp.
+ */
+ public void sync() {
+ this.flags |= STAGE_INVALIDATE;
+ onUpdate(clock.millis());
+ engine.scheduleDelayed(this);
+ }
+
+ /**
* Close the {@link FlowStage} and disconnect all inlets and outlets.
*/
public void close() {