summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-compute/src
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2025-11-13 19:44:43 +0100
committerGitHub <noreply@github.com>2025-11-13 19:44:43 +0100
commit90d4369183a420689fb1d48687a77ec677572433 (patch)
tree6b5bf9dbc1c55700620073a099ed22690e799526 /opendc-simulator/opendc-simulator-compute/src
parent71f63618fb83c8e19ae48d5dc4a6e3927031cc10 (diff)
Updated the checkpointModel and OnUpdate of SimTraceWorkload (#380)
Diffstat (limited to 'opendc-simulator/opendc-simulator-compute/src')
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java20
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java4
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java275
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java12
5 files changed, 210 insertions, 103 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java
index 56e6093b..cc3e0cfa 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java
@@ -35,6 +35,26 @@ public record ChainWorkload(
double checkpointIntervalScaling)
implements Workload {
+ public long failureDelay() {
+ long duration_saved = 0L;
+
+ for (Workload wl : workloads) {
+ duration_saved += wl.failureDelay();
+ }
+
+ return duration_saved;
+ }
+
+ public long checkpointDelay() {
+ long duration_added = 0L;
+
+ for (Workload wl : workloads) {
+ duration_added += wl.checkpointDelay();
+ }
+
+ return duration_added;
+ }
+
public void removeWorkloads(int numberOfWorkloads) {
if (numberOfWorkloads <= 0) {
return;
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java
index d34da203..2027454a 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java
@@ -37,7 +37,7 @@ public class CheckpointModel extends FlowNode {
private SimWorkload simWorkload;
private long checkpointInterval;
private final long checkpointDuration;
- private double checkpointIntervalScaling;
+ private final double checkpointIntervalScaling;
private long startOfInterval;
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java
index 5edacb3b..3fcad483 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java
@@ -35,6 +35,10 @@ public interface Workload {
double checkpointIntervalScaling();
+ long failureDelay();
+
+ long checkpointDelay();
+
SimWorkload startWorkload(FlowSupplier supplier);
SimWorkload startWorkload(List<FlowSupplier> supplier, SimMachine machine, Consumer<Exception> completion);
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java
index ff65fbf2..5e8744d0 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java
@@ -47,21 +47,28 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
private TraceFragment currentFragment;
private long startOfFragment;
+ // The resources used by this workload and the edges to the components
+ private final ArrayList<ResourceType> usedResourceTypes = new ArrayList<>();
private final FlowEdge[] machineResourceEdges = new FlowEdge[ResourceType.values().length];
- // TODO: Currently GPU memory is not considered and can not be used
- private final ArrayList<ResourceType> usedResourceTypes = new ArrayList<>();
+ // the currently supplied resources
+ private final double[] resourcesSupplied = new double[ResourceType.values().length];
+
+ // The demands per resource type
+ private final double[] resourcesDemand = new double[ResourceType.values().length];
+
+ // The remaining work of the current fragment per resource type (depends on the scaling policy)
+ private final double[] remainingWork = new double[ResourceType.values().length];
- private final double[] resourcesSupplied =
- new double[ResourceType.values().length]; // the currently supplied resources
- private final double[] resourcesDemand = new double[ResourceType.values().length]; // The demands per resource type
- private final double[] remainingWork =
- new double[ResourceType.values().length]; // The duration of the fragment at the demanded speeds
- private double totalRemainingWork =
- 0.0; // The total remaining work of the fragment across all resources, used to determine the end of the
+ // The remaining time for each resource type
+ private final long[] remainingTime = new long[ResourceType.values().length];
+
+ // Finished resources for the current fragment (Only relevant when multiple resource types are used)
+ private final boolean[] resourceFinished = new boolean[ResourceType.values().length];
+
+ // The total remaining work of the fragment across all resources, used to determine the end of the
// fragment
- private final boolean[] workloadFinished =
- new boolean[ResourceType.values().length]; // The workload finished for each resource type
+ private double totalRemainingWork = 0.0;
private final long checkpointDuration;
private final TraceWorkload snapshot;
@@ -96,6 +103,14 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
return 0;
}
+ public long getFailureDelay() {
+ return this.snapshot.failureDelay;
+ }
+
+ public long getCheckpointDelay() {
+ return this.snapshot.checkpointDelay;
+ }
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Constructors
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -145,16 +160,24 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
// Fragment related functionality
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- public boolean isWorkloadFinished() {
+ /**
+ * Check if all resources have finished their work for the current fragment
+ */
+ public boolean getAllResourcesFinished() {
for (ResourceType resourceType : this.usedResourceTypes) {
- if (!this.workloadFinished[resourceType.ordinal()]) {
+ if (!this.resourceFinished[resourceType.ordinal()]) {
return false;
}
}
return true;
}
- // Update the remaining work for all resources based on the time passed since last update
+ /**
+ * Use the ScalingPolicy, time since the last update, demands and supplied resources
+ * to update the remaining work for each resource
+ *
+ * @param passedTime Time passed since the last update in milliseconds
+ */
private void updateRemainingWork(long passedTime) {
for (ResourceType resourceType : this.usedResourceTypes) {
// The amount of work done since last update
@@ -163,60 +186,68 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
this.resourcesSupplied[resourceType.ordinal()],
passedTime);
- // TODO: maybe remove Math.max, as as we are already checking for <= 0
this.remainingWork[resourceType.ordinal()] =
Math.max(0, this.remainingWork[resourceType.ordinal()] - finishedWork);
this.totalRemainingWork -= finishedWork;
if (this.remainingWork[resourceType.ordinal()] <= 0) {
- this.workloadFinished[resourceType.ordinal()] = true;
+ this.resourceFinished[resourceType.ordinal()] = true;
}
}
}
- // Determine the next update time based on the remaining work and supplied resources
- private long getNextUpdateTime(long now) {
- long timeUntilNextUpdate = Long.MIN_VALUE;
-
+ /**
+ * Update the remaining time for each resource using the ScalingPolicy, remaining work and supplied resources
+ */
+ private void updateRemainingTime() {
for (ResourceType resourceType : this.usedResourceTypes) {
- // The amount of time required to finish the fragment at this speed
- long remainingDuration = this.scalingPolicy.getRemainingDuration(
+ this.remainingTime[resourceType.ordinal()] = this.scalingPolicy.getRemainingDuration(
this.resourcesDemand[resourceType.ordinal()],
this.resourcesSupplied[resourceType.ordinal()],
this.remainingWork[resourceType.ordinal()]);
+ }
+ }
- if ((int) remainingDuration == 0) {
- // if resource not initialized, then nothing happens
- if (this.remainingWork[resourceType.ordinal()] >= 0.0) {
- this.totalRemainingWork -= this.remainingWork[resourceType.ordinal()];
- }
- this.remainingWork[resourceType.ordinal()] = 0.0;
- this.workloadFinished[resourceType.ordinal()] = true;
- }
-
- // The next update should happen when the fastest resource is done, so that it is no longer tracked when
- // unused
- if (remainingDuration > 0
- && (timeUntilNextUpdate == Long.MIN_VALUE || remainingDuration < timeUntilNextUpdate)) {
- timeUntilNextUpdate = remainingDuration;
- }
+ /**
+ * Get the next update time based on the remaining time of each resource
+ * The next update time is when the fastest resource that is not yet finished will finish
+ *
+ * @param now Current time in milliseconds
+ * @return The next update time in milliseconds
+ */
+ private long getNextUpdateTime(long now) {
+ if (this.getAllResourcesFinished()) {
+ return now;
}
- return timeUntilNextUpdate == Long.MAX_VALUE ? Long.MAX_VALUE : now + timeUntilNextUpdate;
- }
+ long timeUntilNextUpdate = Long.MAX_VALUE;
- private void pushNewDemands() {
for (ResourceType resourceType : this.usedResourceTypes) {
- if (this.machineResourceEdges[resourceType.ordinal()] != null) {
- this.pushOutgoingDemand(
- this.machineResourceEdges[resourceType.ordinal()],
- this.resourcesDemand[resourceType.ordinal()],
- resourceType);
+ long remainingTime = this.remainingTime[resourceType.ordinal()];
+
+ // The next update should happen when the fastest resource is done
+ if (!this.resourceFinished[resourceType.ordinal()] && remainingTime < timeUntilNextUpdate) {
+ timeUntilNextUpdate = remainingTime;
}
}
+
+ return timeUntilNextUpdate == Long.MAX_VALUE ? Long.MAX_VALUE : now + timeUntilNextUpdate;
}
+ /**
+ * Handle an update event for this workload
+ * <p>
+ * There are three possible scenarios:
+ * <ol>
+ * <li>The fragment is completed across all resources: start the next fragment and call onUpdate again</li>
+ * <li>The fragment is not yet completed: push new demands and update remaining time</li>
+ * <li>The workload is completed: stop the workload</li>
+ * </ol>
+ *
+ * @param now The virtual timestamp in milliseconds at which the update is occurring.
+ * @return The next update time in milliseconds.
+ */
@Override
public long onUpdate(long now) {
long passedTime = getPassedTime(now);
@@ -225,28 +256,27 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
this.updateRemainingWork(passedTime);
// If this.totalRemainingWork <= 0, the fragment has been completed across all resources
- if ((int) this.totalRemainingWork <= 0 && this.isWorkloadFinished()) {
+ if ((int) this.totalRemainingWork <= 0) {
this.startNextFragment();
if (this.nodeState == NodeState.CLOSING || this.nodeState == NodeState.CLOSED) {
return Long.MAX_VALUE;
}
- return getNextUpdateTime(this.startOfFragment);
+ return this.onUpdate(now);
}
this.pushNewDemands();
+ this.updateRemainingTime();
- long nextUpdate = getNextUpdateTime(this.startOfFragment);
-
- // if for all resources the remaining work is 0, then invalidate the workload, to reschedule the next fragment
- if (nextUpdate == now + Long.MIN_VALUE) {
- this.invalidate();
- return Long.MAX_VALUE;
- }
- return nextUpdate;
+ return getNextUpdateTime(this.startOfFragment);
}
+ /**
+ * Get the next fragment to be executed
+ *
+ * @return The next TraceFragment or null if there are no more fragments
+ */
public TraceFragment getNextFragment() {
if (this.remainingFragments.isEmpty()) {
return null;
@@ -257,8 +287,12 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
return this.currentFragment;
}
+ /**
+ * Start the next fragment by resetting the remaining work and pushing new demands to the VM
+ *
+ * If no more fragments are left, stopWorkload is called.
+ */
private void startNextFragment() {
-
TraceFragment nextFragment = this.getNextFragment();
if (nextFragment == null) {
this.stopWorkload();
@@ -274,7 +308,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
this.remainingWork[resourceType.ordinal()] =
this.scalingPolicy.getRemainingWork(demand, nextFragment.duration());
this.totalRemainingWork += this.remainingWork[resourceType.ordinal()];
- this.workloadFinished[resourceType.ordinal()] = false;
+ this.resourceFinished[resourceType.ordinal()] = false;
if (this.machineResourceEdges[resourceType.ordinal()] != null) {
this.pushOutgoingDemand(this.machineResourceEdges[resourceType.ordinal()], demand, resourceType);
@@ -282,20 +316,46 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
}
}
+ /**
+ * Stop the workload and clean up resources. All connected edges are notified.
+ * <p>
+ * stopWorkload can be called in two scenarios:
+ * <ol>
+ * <li>The workload has completed successfully</li>
+ * <li>The workload is stopped because the task is paused of the host has failed
+ * </ol>
+ * <p>
+ * On failure, the wasted time is calculated for bookkeeping purposes.
+ */
@Override
public void stopWorkload() {
+
+ // If the workload is stopped due to an error or failure, calculate the wasted time for bookkeeping.
+ if (this.totalRemainingWork > 0.0 || !this.remainingFragments.isEmpty()) {
+ // Failure
+
+ this.updateRemainingWork(this.clock.millis() - this.startOfFragment);
+
+ for (int i = 0; i < this.fragmentIndex; i++) {
+ this.snapshot.failureDelay +=
+ this.snapshot.getFragments().get(i).duration();
+ }
+ this.snapshot.failureDelay -= (long) this.totalRemainingWork;
+ }
+
+ // The workload has already been stopped
if (areAllEdgesNull()) {
return;
}
// TODO: Maybe move this to the end
- // Currently stopWorkload is called twice
this.closeNode();
for (ResourceType resourceType : this.usedResourceTypes) {
this.machineResourceEdges[resourceType.ordinal()] = null;
- this.workloadFinished[resourceType.ordinal()] = true;
+ this.resourceFinished[resourceType.ordinal()] = true;
}
+
this.remainingFragments = null;
this.currentFragment = null;
}
@@ -306,63 +366,55 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
/**
* SimTraceWorkload does not make a checkpoint, checkpointing is handled by SimChainWorkload
- * TODO: Maybe add checkpoint models for SimTraceWorkload
*/
@Override
public void createCheckpointModel() {}
/**
- * Create a new snapshot based on the current status of the workload.
- * @param now Moment on which the snapshot is made in milliseconds
+ * Create a snapshot of the current state of the workload
+ * <p>
+ * First calculate the remaining work of the current fragment based on the time passed since the last update.
+ * Then, Create a new fragment based on the current fragment and the remaining work.
+ * Finally, update the snapshot and the remaining fragments.
+ * <p>
+ * The snapshot contains all remaining fragments, with the current fragment adjusted to only include the remaining
+ * work.
+ *
+ * @param now Current time in milliseconds
*/
public void makeSnapshot(long now) {
-
- // Check if fragments is empty
-
- // Get remaining time of current fragment
long passedTime = getPassedTime(now);
+ this.startOfFragment = now;
- // The amount of work done since last update
- for (ResourceType resourceType : this.usedResourceTypes) {
- double finishedWork = this.scalingPolicy.getFinishedWork(
- this.resourcesDemand[resourceType.ordinal()],
- this.resourcesSupplied[resourceType.ordinal()],
- passedTime);
- this.remainingWork[resourceType.ordinal()] = this.remainingWork[resourceType.ordinal()] - finishedWork;
- this.totalRemainingWork -= finishedWork;
- }
-
- long remainingDuration = 0;
- for (ResourceType resourceType : this.usedResourceTypes) {
+ this.updateRemainingWork(passedTime);
+ this.updateRemainingTime();
- // The amount of time required to finish the fragment at this speed
- remainingDuration = Math.max(
- remainingDuration,
- this.scalingPolicy.getRemainingDuration(
- this.resourcesDemand[resourceType.ordinal()],
- this.resourcesSupplied[resourceType.ordinal()],
- this.remainingWork[resourceType.ordinal()]));
- }
+ long remainingDuration = Arrays.stream(this.remainingTime).max().orElseThrow();
// If this is the end of the Task, don't make a snapshot
if (this.currentFragment == null || (remainingDuration <= 0 && remainingFragments.isEmpty())) {
return;
}
- // Create a new fragment based on the current fragment and remaining duration
- TraceFragment newFragment = new TraceFragment(
- remainingDuration,
- currentFragment.cpuUsage(),
- currentFragment.gpuUsage(),
- currentFragment.gpuMemoryUsage());
-
- // Alter the snapshot by removing finished fragments
+ // Remove all fragments up to and including the current fragment from the snapshot
+ // These fragments will have to be re-executed after a failure
this.snapshot.removeFragments(this.fragmentIndex);
- this.snapshot.addFirst(newFragment);
- this.remainingFragments.addFirst(newFragment);
+ // Create a new fragment with the same resource usage as the current fragment,
+ // but with the remaining duration. Put the adjusted fragment at the front of the
+ // remaining fragments and snapshot
+ if (remainingDuration > 0) {
+ TraceFragment adjustedFragment = new TraceFragment(
+ remainingDuration,
+ currentFragment.cpuUsage(),
+ currentFragment.gpuUsage(),
+ currentFragment.gpuMemoryUsage());
+
+ this.snapshot.addFirst(adjustedFragment);
+ this.remainingFragments.addFirst(adjustedFragment);
+ }
- // Create and add a fragment for processing the snapshot process
+ // Create a fragment for processing the snapshot process and add it to the front of the remaining fragments
TraceFragment snapshotFragment = new TraceFragment(
this.checkpointDuration,
this.snapshot.getMaxCpuDemand(),
@@ -370,11 +422,12 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
this.snapshot.getMaxGpuMemoryDemand());
this.remainingFragments.addFirst(snapshotFragment);
+ // Add the checkpoint duration for bookkeeping
+ this.snapshot.checkpointDelay += this.checkpointDuration;
+
this.fragmentIndex = -1;
startNextFragment();
- this.startOfFragment = now;
-
this.invalidate();
}
@@ -424,6 +477,8 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
long passedTime = getPassedTime(now);
this.updateRemainingWork(passedTime);
+ this.updateRemainingTime();
+
long next_deadline = this.getNextUpdateTime(now);
// Remove stage from the timer queue
@@ -432,6 +487,20 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
}
/**
+ * Push new demands for all resource types to the Virtual Machine
+ */
+ private void pushNewDemands() {
+ for (ResourceType resourceType : this.usedResourceTypes) {
+ if (this.machineResourceEdges[resourceType.ordinal()] != null) {
+ this.pushOutgoingDemand(
+ this.machineResourceEdges[resourceType.ordinal()],
+ this.resourcesDemand[resourceType.ordinal()],
+ resourceType);
+ }
+ }
+ }
+
+ /**
* Push a new demand to the Virtual Machine
*
* @param supplierEdge edge to the VM on which this is running
@@ -505,6 +574,11 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
this.stopWorkload();
}
+ /**
+ * Get all connected edges to this workload
+ *
+ * @return A map of connected edges categorized by their node type.
+ */
@Override
public Map<FlowEdge.NodeType, List<FlowEdge>> getConnectedEdges() {
Map<FlowEdge.NodeType, List<FlowEdge>> connectedEdges = new HashMap<>();
@@ -517,9 +591,6 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
return connectedEdges;
}
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Util Methods
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
private boolean areAllEdgesNull() {
for (FlowEdge edge : this.machineResourceEdges) {
if (edge != null) {
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java
index 53ce9f31..161a8041 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java
@@ -46,6 +46,9 @@ public class TraceWorkload implements Workload {
private final int taskId;
private final ResourceType[] resourceTypes;
+ public long checkpointDelay = 0;
+ public long failureDelay = 0;
+
public ScalingPolicy getScalingPolicy() {
return scalingPolicy;
}
@@ -116,10 +119,19 @@ public class TraceWorkload implements Workload {
return taskId;
}
+ public long failureDelay() {
+ return failureDelay;
+ }
+
+ public long checkpointDelay() {
+ return checkpointDelay;
+ }
+
public void removeFragments(int numberOfFragments) {
if (numberOfFragments <= 0) {
return;
}
+
this.fragments.subList(0, numberOfFragments).clear();
}