diff options
| author | Dante Niewenhuis <d.niewenhuis@hotmail.com> | 2025-11-13 19:44:43 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-11-13 19:44:43 +0100 |
| commit | 90d4369183a420689fb1d48687a77ec677572433 (patch) | |
| tree | 6b5bf9dbc1c55700620073a099ed22690e799526 /opendc-simulator/opendc-simulator-compute/src/main | |
| parent | 71f63618fb83c8e19ae48d5dc4a6e3927031cc10 (diff) | |
Updated the checkpointModel and OnUpdate of SimTraceWorkload (#380)
Diffstat (limited to 'opendc-simulator/opendc-simulator-compute/src/main')
5 files changed, 210 insertions, 103 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java index 56e6093b..cc3e0cfa 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java @@ -35,6 +35,26 @@ public record ChainWorkload( double checkpointIntervalScaling) implements Workload { + public long failureDelay() { + long duration_saved = 0L; + + for (Workload wl : workloads) { + duration_saved += wl.failureDelay(); + } + + return duration_saved; + } + + public long checkpointDelay() { + long duration_added = 0L; + + for (Workload wl : workloads) { + duration_added += wl.checkpointDelay(); + } + + return duration_added; + } + public void removeWorkloads(int numberOfWorkloads) { if (numberOfWorkloads <= 0) { return; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java index d34da203..2027454a 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java @@ -37,7 +37,7 @@ public class CheckpointModel extends FlowNode { private SimWorkload simWorkload; private long checkpointInterval; private final long checkpointDuration; - private double checkpointIntervalScaling; + private final double checkpointIntervalScaling; private long startOfInterval; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java index 5edacb3b..3fcad483 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java @@ -35,6 +35,10 @@ public interface Workload { double checkpointIntervalScaling(); + long failureDelay(); + + long checkpointDelay(); + SimWorkload startWorkload(FlowSupplier supplier); SimWorkload startWorkload(List<FlowSupplier> supplier, SimMachine machine, Consumer<Exception> completion); diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java index ff65fbf2..5e8744d0 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java @@ -47,21 +47,28 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { private TraceFragment currentFragment; private long startOfFragment; + // The resources used by this workload and the edges to the components + private final ArrayList<ResourceType> usedResourceTypes = new ArrayList<>(); private final FlowEdge[] machineResourceEdges = new FlowEdge[ResourceType.values().length]; - // TODO: Currently GPU memory is not considered and can not be used - private final ArrayList<ResourceType> usedResourceTypes = new ArrayList<>(); + // the currently supplied resources + private final double[] resourcesSupplied = new double[ResourceType.values().length]; + + // The demands per resource type + private final double[] resourcesDemand = new double[ResourceType.values().length]; + + // The remaining work of the current fragment per resource type (depends on the scaling policy) + private final double[] remainingWork = new double[ResourceType.values().length]; - private final double[] resourcesSupplied = - new double[ResourceType.values().length]; // the currently supplied resources - private final double[] resourcesDemand = new double[ResourceType.values().length]; // The demands per resource type - private final double[] remainingWork = - new double[ResourceType.values().length]; // The duration of the fragment at the demanded speeds - private double totalRemainingWork = - 0.0; // The total remaining work of the fragment across all resources, used to determine the end of the + // The remaining time for each resource type + private final long[] remainingTime = new long[ResourceType.values().length]; + + // Finished resources for the current fragment (Only relevant when multiple resource types are used) + private final boolean[] resourceFinished = new boolean[ResourceType.values().length]; + + // The total remaining work of the fragment across all resources, used to determine the end of the // fragment - private final boolean[] workloadFinished = - new boolean[ResourceType.values().length]; // The workload finished for each resource type + private double totalRemainingWork = 0.0; private final long checkpointDuration; private final TraceWorkload snapshot; @@ -96,6 +103,14 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { return 0; } + public long getFailureDelay() { + return this.snapshot.failureDelay; + } + + public long getCheckpointDelay() { + return this.snapshot.checkpointDelay; + } + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Constructors //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -145,16 +160,24 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { // Fragment related functionality //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - public boolean isWorkloadFinished() { + /** + * Check if all resources have finished their work for the current fragment + */ + public boolean getAllResourcesFinished() { for (ResourceType resourceType : this.usedResourceTypes) { - if (!this.workloadFinished[resourceType.ordinal()]) { + if (!this.resourceFinished[resourceType.ordinal()]) { return false; } } return true; } - // Update the remaining work for all resources based on the time passed since last update + /** + * Use the ScalingPolicy, time since the last update, demands and supplied resources + * to update the remaining work for each resource + * + * @param passedTime Time passed since the last update in milliseconds + */ private void updateRemainingWork(long passedTime) { for (ResourceType resourceType : this.usedResourceTypes) { // The amount of work done since last update @@ -163,60 +186,68 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { this.resourcesSupplied[resourceType.ordinal()], passedTime); - // TODO: maybe remove Math.max, as as we are already checking for <= 0 this.remainingWork[resourceType.ordinal()] = Math.max(0, this.remainingWork[resourceType.ordinal()] - finishedWork); this.totalRemainingWork -= finishedWork; if (this.remainingWork[resourceType.ordinal()] <= 0) { - this.workloadFinished[resourceType.ordinal()] = true; + this.resourceFinished[resourceType.ordinal()] = true; } } } - // Determine the next update time based on the remaining work and supplied resources - private long getNextUpdateTime(long now) { - long timeUntilNextUpdate = Long.MIN_VALUE; - + /** + * Update the remaining time for each resource using the ScalingPolicy, remaining work and supplied resources + */ + private void updateRemainingTime() { for (ResourceType resourceType : this.usedResourceTypes) { - // The amount of time required to finish the fragment at this speed - long remainingDuration = this.scalingPolicy.getRemainingDuration( + this.remainingTime[resourceType.ordinal()] = this.scalingPolicy.getRemainingDuration( this.resourcesDemand[resourceType.ordinal()], this.resourcesSupplied[resourceType.ordinal()], this.remainingWork[resourceType.ordinal()]); + } + } - if ((int) remainingDuration == 0) { - // if resource not initialized, then nothing happens - if (this.remainingWork[resourceType.ordinal()] >= 0.0) { - this.totalRemainingWork -= this.remainingWork[resourceType.ordinal()]; - } - this.remainingWork[resourceType.ordinal()] = 0.0; - this.workloadFinished[resourceType.ordinal()] = true; - } - - // The next update should happen when the fastest resource is done, so that it is no longer tracked when - // unused - if (remainingDuration > 0 - && (timeUntilNextUpdate == Long.MIN_VALUE || remainingDuration < timeUntilNextUpdate)) { - timeUntilNextUpdate = remainingDuration; - } + /** + * Get the next update time based on the remaining time of each resource + * The next update time is when the fastest resource that is not yet finished will finish + * + * @param now Current time in milliseconds + * @return The next update time in milliseconds + */ + private long getNextUpdateTime(long now) { + if (this.getAllResourcesFinished()) { + return now; } - return timeUntilNextUpdate == Long.MAX_VALUE ? Long.MAX_VALUE : now + timeUntilNextUpdate; - } + long timeUntilNextUpdate = Long.MAX_VALUE; - private void pushNewDemands() { for (ResourceType resourceType : this.usedResourceTypes) { - if (this.machineResourceEdges[resourceType.ordinal()] != null) { - this.pushOutgoingDemand( - this.machineResourceEdges[resourceType.ordinal()], - this.resourcesDemand[resourceType.ordinal()], - resourceType); + long remainingTime = this.remainingTime[resourceType.ordinal()]; + + // The next update should happen when the fastest resource is done + if (!this.resourceFinished[resourceType.ordinal()] && remainingTime < timeUntilNextUpdate) { + timeUntilNextUpdate = remainingTime; } } + + return timeUntilNextUpdate == Long.MAX_VALUE ? Long.MAX_VALUE : now + timeUntilNextUpdate; } + /** + * Handle an update event for this workload + * <p> + * There are three possible scenarios: + * <ol> + * <li>The fragment is completed across all resources: start the next fragment and call onUpdate again</li> + * <li>The fragment is not yet completed: push new demands and update remaining time</li> + * <li>The workload is completed: stop the workload</li> + * </ol> + * + * @param now The virtual timestamp in milliseconds at which the update is occurring. + * @return The next update time in milliseconds. + */ @Override public long onUpdate(long now) { long passedTime = getPassedTime(now); @@ -225,28 +256,27 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { this.updateRemainingWork(passedTime); // If this.totalRemainingWork <= 0, the fragment has been completed across all resources - if ((int) this.totalRemainingWork <= 0 && this.isWorkloadFinished()) { + if ((int) this.totalRemainingWork <= 0) { this.startNextFragment(); if (this.nodeState == NodeState.CLOSING || this.nodeState == NodeState.CLOSED) { return Long.MAX_VALUE; } - return getNextUpdateTime(this.startOfFragment); + return this.onUpdate(now); } this.pushNewDemands(); + this.updateRemainingTime(); - long nextUpdate = getNextUpdateTime(this.startOfFragment); - - // if for all resources the remaining work is 0, then invalidate the workload, to reschedule the next fragment - if (nextUpdate == now + Long.MIN_VALUE) { - this.invalidate(); - return Long.MAX_VALUE; - } - return nextUpdate; + return getNextUpdateTime(this.startOfFragment); } + /** + * Get the next fragment to be executed + * + * @return The next TraceFragment or null if there are no more fragments + */ public TraceFragment getNextFragment() { if (this.remainingFragments.isEmpty()) { return null; @@ -257,8 +287,12 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { return this.currentFragment; } + /** + * Start the next fragment by resetting the remaining work and pushing new demands to the VM + * + * If no more fragments are left, stopWorkload is called. + */ private void startNextFragment() { - TraceFragment nextFragment = this.getNextFragment(); if (nextFragment == null) { this.stopWorkload(); @@ -274,7 +308,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { this.remainingWork[resourceType.ordinal()] = this.scalingPolicy.getRemainingWork(demand, nextFragment.duration()); this.totalRemainingWork += this.remainingWork[resourceType.ordinal()]; - this.workloadFinished[resourceType.ordinal()] = false; + this.resourceFinished[resourceType.ordinal()] = false; if (this.machineResourceEdges[resourceType.ordinal()] != null) { this.pushOutgoingDemand(this.machineResourceEdges[resourceType.ordinal()], demand, resourceType); @@ -282,20 +316,46 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { } } + /** + * Stop the workload and clean up resources. All connected edges are notified. + * <p> + * stopWorkload can be called in two scenarios: + * <ol> + * <li>The workload has completed successfully</li> + * <li>The workload is stopped because the task is paused of the host has failed + * </ol> + * <p> + * On failure, the wasted time is calculated for bookkeeping purposes. + */ @Override public void stopWorkload() { + + // If the workload is stopped due to an error or failure, calculate the wasted time for bookkeeping. + if (this.totalRemainingWork > 0.0 || !this.remainingFragments.isEmpty()) { + // Failure + + this.updateRemainingWork(this.clock.millis() - this.startOfFragment); + + for (int i = 0; i < this.fragmentIndex; i++) { + this.snapshot.failureDelay += + this.snapshot.getFragments().get(i).duration(); + } + this.snapshot.failureDelay -= (long) this.totalRemainingWork; + } + + // The workload has already been stopped if (areAllEdgesNull()) { return; } // TODO: Maybe move this to the end - // Currently stopWorkload is called twice this.closeNode(); for (ResourceType resourceType : this.usedResourceTypes) { this.machineResourceEdges[resourceType.ordinal()] = null; - this.workloadFinished[resourceType.ordinal()] = true; + this.resourceFinished[resourceType.ordinal()] = true; } + this.remainingFragments = null; this.currentFragment = null; } @@ -306,63 +366,55 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { /** * SimTraceWorkload does not make a checkpoint, checkpointing is handled by SimChainWorkload - * TODO: Maybe add checkpoint models for SimTraceWorkload */ @Override public void createCheckpointModel() {} /** - * Create a new snapshot based on the current status of the workload. - * @param now Moment on which the snapshot is made in milliseconds + * Create a snapshot of the current state of the workload + * <p> + * First calculate the remaining work of the current fragment based on the time passed since the last update. + * Then, Create a new fragment based on the current fragment and the remaining work. + * Finally, update the snapshot and the remaining fragments. + * <p> + * The snapshot contains all remaining fragments, with the current fragment adjusted to only include the remaining + * work. + * + * @param now Current time in milliseconds */ public void makeSnapshot(long now) { - - // Check if fragments is empty - - // Get remaining time of current fragment long passedTime = getPassedTime(now); + this.startOfFragment = now; - // The amount of work done since last update - for (ResourceType resourceType : this.usedResourceTypes) { - double finishedWork = this.scalingPolicy.getFinishedWork( - this.resourcesDemand[resourceType.ordinal()], - this.resourcesSupplied[resourceType.ordinal()], - passedTime); - this.remainingWork[resourceType.ordinal()] = this.remainingWork[resourceType.ordinal()] - finishedWork; - this.totalRemainingWork -= finishedWork; - } - - long remainingDuration = 0; - for (ResourceType resourceType : this.usedResourceTypes) { + this.updateRemainingWork(passedTime); + this.updateRemainingTime(); - // The amount of time required to finish the fragment at this speed - remainingDuration = Math.max( - remainingDuration, - this.scalingPolicy.getRemainingDuration( - this.resourcesDemand[resourceType.ordinal()], - this.resourcesSupplied[resourceType.ordinal()], - this.remainingWork[resourceType.ordinal()])); - } + long remainingDuration = Arrays.stream(this.remainingTime).max().orElseThrow(); // If this is the end of the Task, don't make a snapshot if (this.currentFragment == null || (remainingDuration <= 0 && remainingFragments.isEmpty())) { return; } - // Create a new fragment based on the current fragment and remaining duration - TraceFragment newFragment = new TraceFragment( - remainingDuration, - currentFragment.cpuUsage(), - currentFragment.gpuUsage(), - currentFragment.gpuMemoryUsage()); - - // Alter the snapshot by removing finished fragments + // Remove all fragments up to and including the current fragment from the snapshot + // These fragments will have to be re-executed after a failure this.snapshot.removeFragments(this.fragmentIndex); - this.snapshot.addFirst(newFragment); - this.remainingFragments.addFirst(newFragment); + // Create a new fragment with the same resource usage as the current fragment, + // but with the remaining duration. Put the adjusted fragment at the front of the + // remaining fragments and snapshot + if (remainingDuration > 0) { + TraceFragment adjustedFragment = new TraceFragment( + remainingDuration, + currentFragment.cpuUsage(), + currentFragment.gpuUsage(), + currentFragment.gpuMemoryUsage()); + + this.snapshot.addFirst(adjustedFragment); + this.remainingFragments.addFirst(adjustedFragment); + } - // Create and add a fragment for processing the snapshot process + // Create a fragment for processing the snapshot process and add it to the front of the remaining fragments TraceFragment snapshotFragment = new TraceFragment( this.checkpointDuration, this.snapshot.getMaxCpuDemand(), @@ -370,11 +422,12 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { this.snapshot.getMaxGpuMemoryDemand()); this.remainingFragments.addFirst(snapshotFragment); + // Add the checkpoint duration for bookkeeping + this.snapshot.checkpointDelay += this.checkpointDuration; + this.fragmentIndex = -1; startNextFragment(); - this.startOfFragment = now; - this.invalidate(); } @@ -424,6 +477,8 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { long passedTime = getPassedTime(now); this.updateRemainingWork(passedTime); + this.updateRemainingTime(); + long next_deadline = this.getNextUpdateTime(now); // Remove stage from the timer queue @@ -432,6 +487,20 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { } /** + * Push new demands for all resource types to the Virtual Machine + */ + private void pushNewDemands() { + for (ResourceType resourceType : this.usedResourceTypes) { + if (this.machineResourceEdges[resourceType.ordinal()] != null) { + this.pushOutgoingDemand( + this.machineResourceEdges[resourceType.ordinal()], + this.resourcesDemand[resourceType.ordinal()], + resourceType); + } + } + } + + /** * Push a new demand to the Virtual Machine * * @param supplierEdge edge to the VM on which this is running @@ -505,6 +574,11 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { this.stopWorkload(); } + /** + * Get all connected edges to this workload + * + * @return A map of connected edges categorized by their node type. + */ @Override public Map<FlowEdge.NodeType, List<FlowEdge>> getConnectedEdges() { Map<FlowEdge.NodeType, List<FlowEdge>> connectedEdges = new HashMap<>(); @@ -517,9 +591,6 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { return connectedEdges; } - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Util Methods - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// private boolean areAllEdgesNull() { for (FlowEdge edge : this.machineResourceEdges) { if (edge != null) { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java index 53ce9f31..161a8041 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java @@ -46,6 +46,9 @@ public class TraceWorkload implements Workload { private final int taskId; private final ResourceType[] resourceTypes; + public long checkpointDelay = 0; + public long failureDelay = 0; + public ScalingPolicy getScalingPolicy() { return scalingPolicy; } @@ -116,10 +119,19 @@ public class TraceWorkload implements Workload { return taskId; } + public long failureDelay() { + return failureDelay; + } + + public long checkpointDelay() { + return checkpointDelay; + } + public void removeFragments(int numberOfFragments) { if (numberOfFragments <= 0) { return; } + this.fragments.subList(0, numberOfFragments).clear(); } |
