From 90d4369183a420689fb1d48687a77ec677572433 Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Thu, 13 Nov 2025 19:44:43 +0100 Subject: Updated the checkpointModel and OnUpdate of SimTraceWorkload (#380) --- .../compute/simulator/service/ComputeService.java | 4 + .../compute/simulator/service/ServiceTask.java | 14 +- .../compute/simulator/scheduler/FilterScheduler.kt | 5 + .../telemetry/parquet/DfltTaskExportColumns.kt | 15 ++ .../telemetry/table/task/TaskTableReader.kt | 6 + .../telemetry/table/task/TaskTableReaderImpl.kt | 20 ++ .../experiments/base/runner/ScenarioRunner.kt | 2 +- .../base/FailuresAndCheckpointingTest.kt | 1 - .../simulator/compute/workload/ChainWorkload.java | 20 ++ .../compute/workload/CheckpointModel.java | 2 +- .../simulator/compute/workload/Workload.java | 4 + .../compute/workload/trace/SimTraceWorkload.java | 275 +++++++++++++-------- .../compute/workload/trace/TraceWorkload.java | 12 + 13 files changed, 273 insertions(+), 107 deletions(-) diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java index f357b164..f4adaac1 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ComputeService.java @@ -573,6 +573,10 @@ public final class ComputeService implements AutoCloseable, CarbonReceiver { activeTasks.put(task, host); updateHost(host); + + long new_scheduling_delay = clock.millis() - req.getSubmitTime() + task.getSchedulingDelay(); + task.setSchedulingDelay(new_scheduling_delay); + } catch (Exception cause) { LOGGER.error("Failed to deploy VM", cause); scheduler.removeTask(task, hv); diff --git a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java index 8c066e4f..689a0e95 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java +++ b/opendc-compute/opendc-compute-simulator/src/main/java/org/opendc/compute/simulator/service/ServiceTask.java @@ -75,6 +75,8 @@ public class ServiceTask { private int numFailures = 0; private int numPauses = 0; + private long schedulingDelay = 0; + /// ////////////////////////////////////////////////////////////////////////////////////////////////// /// Getters and Setters /// ////////////////////////////////////////////////////////////////////////////////////////////////// @@ -343,12 +345,12 @@ public class ServiceTask { case PAUSED: LOGGER.info("User requested to start task after pause {}", id); setState(TaskState.PROVISIONING); - request = service.schedule(this, true); + request = service.schedule(this, false); break; case FAILED: LOGGER.info("User requested to start task after failure {}", id); setState(TaskState.PROVISIONING); - request = service.schedule(this, true); + request = service.schedule(this, false); break; } } @@ -433,4 +435,12 @@ public class ServiceTask { return !parents.isEmpty(); } + + public long getSchedulingDelay() { + return schedulingDelay; + } + + public void setSchedulingDelay(long schedulingDelay) { + this.schedulingDelay = schedulingDelay; + } } diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/FilterScheduler.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/FilterScheduler.kt index 3dab71b0..913060e5 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/FilterScheduler.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/scheduler/FilterScheduler.kt @@ -22,6 +22,7 @@ package org.opendc.compute.simulator.scheduler +import org.opendc.compute.simulator.host.HostState import org.opendc.compute.simulator.scheduler.filters.HostFilter import org.opendc.compute.simulator.scheduler.weights.HostWeigher import org.opendc.compute.simulator.service.HostView @@ -101,6 +102,10 @@ public class FilterScheduler( } override fun updateHost(hostView: HostView) { + if (hostView.host.getState() == HostState.ERROR) { + return + } + if (hostView.host.isEmpty()) { setHostEmpty(hostView) } else { diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt index b88c75e6..96cb5f65 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/parquet/DfltTaskExportColumns.kt @@ -224,6 +224,21 @@ public object DfltTaskExportColumns { return@ExportColumn Binary.fromString(it.taskState!!.name) } + public val schedulingDelay: ExportColumn = + ExportColumn( + field = Types.optional(INT64).named("scheduling_delay"), + ) { it.schedulingDelay } + + public val failureDelay: ExportColumn = + ExportColumn( + field = Types.optional(INT64).named("failure_delay"), + ) { it.failureDelay } + + public val checkpointDelay: ExportColumn = + ExportColumn( + field = Types.optional(INT64).named("checkpoint_delay"), + ) { it.checkpointDelay } + /** * The columns that are always included in the output file. */ diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReader.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReader.kt index 1f18ee50..cef3d8be 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReader.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReader.kt @@ -169,6 +169,12 @@ public interface TaskTableReader : Exportable { * The state of the task */ public val taskState: TaskState? + + public val schedulingDelay: Long + + public val failureDelay: Long + + public val checkpointDelay: Long } // Loads the default export fields for deserialization whenever this file is loaded. diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReaderImpl.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReaderImpl.kt index d4d5c7a6..be858e4f 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReaderImpl.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/telemetry/table/task/TaskTableReaderImpl.kt @@ -82,6 +82,10 @@ public class TaskTableReaderImpl( _finishTime = table.finishTime _taskState = table.taskState + + _schedulingDelay = table.schedulingDelay + _failureDelay = table.failureDelay + _checkpointDelay = table.checkpointDelay } /** @@ -214,6 +218,18 @@ public class TaskTableReaderImpl( get() = _taskState private var _taskState: TaskState? = null + override val schedulingDelay: Long + get() = _schedulingDelay + private var _schedulingDelay: Long = 0L + + override val failureDelay: Long + get() = _failureDelay + private var _failureDelay: Long = 0L + + override val checkpointDelay: Long + get() = _checkpointDelay + private var _checkpointDelay: Long = 0L + /** * Record the next cycle. */ @@ -257,6 +273,10 @@ public class TaskTableReaderImpl( _scheduleTime = task.scheduledAt _finishTime = task.finishedAt + _schedulingDelay = task.schedulingDelay + _failureDelay = task.workload.failureDelay() + _checkpointDelay = task.workload.checkpointDelay() + if (gpuStats != null) { _gpuLimit = gpuStats.capacity _gpuDemand = gpuStats.demand diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt index a29a1dd5..b07a50ae 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt @@ -180,7 +180,7 @@ public fun runScenario( } /** - * Saves the simulation results into a specific output folder received from the input. + * Saves the simulation results into a specific output folder received from the input.A * * @param provisioner The provisioner used to setup and run the simulation. * @param serviceDomain The domain of the compute service. diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt index 1d0acbfe..c49d4efd 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt @@ -270,7 +270,6 @@ class FailuresAndCheckpointingTest { monitor.maxTimestamp, ) { "Total runtime incorrect" } }, - // TODO: The energy draw of last item (56 * 150.0) is wrong. Figure out why? { assertEquals( (10 * 60 * 150.0) + (5 * 60 * 100.0) + (9 * 150.0) + (56 * 150.0), diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java index 56e6093b..cc3e0cfa 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/ChainWorkload.java @@ -35,6 +35,26 @@ public record ChainWorkload( double checkpointIntervalScaling) implements Workload { + public long failureDelay() { + long duration_saved = 0L; + + for (Workload wl : workloads) { + duration_saved += wl.failureDelay(); + } + + return duration_saved; + } + + public long checkpointDelay() { + long duration_added = 0L; + + for (Workload wl : workloads) { + duration_added += wl.checkpointDelay(); + } + + return duration_added; + } + public void removeWorkloads(int numberOfWorkloads) { if (numberOfWorkloads <= 0) { return; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java index d34da203..2027454a 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/CheckpointModel.java @@ -37,7 +37,7 @@ public class CheckpointModel extends FlowNode { private SimWorkload simWorkload; private long checkpointInterval; private final long checkpointDuration; - private double checkpointIntervalScaling; + private final double checkpointIntervalScaling; private long startOfInterval; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java index 5edacb3b..3fcad483 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/Workload.java @@ -35,6 +35,10 @@ public interface Workload { double checkpointIntervalScaling(); + long failureDelay(); + + long checkpointDelay(); + SimWorkload startWorkload(FlowSupplier supplier); SimWorkload startWorkload(List supplier, SimMachine machine, Consumer completion); diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java index ff65fbf2..5e8744d0 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java @@ -47,21 +47,28 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { private TraceFragment currentFragment; private long startOfFragment; + // The resources used by this workload and the edges to the components + private final ArrayList usedResourceTypes = new ArrayList<>(); private final FlowEdge[] machineResourceEdges = new FlowEdge[ResourceType.values().length]; - // TODO: Currently GPU memory is not considered and can not be used - private final ArrayList usedResourceTypes = new ArrayList<>(); + // the currently supplied resources + private final double[] resourcesSupplied = new double[ResourceType.values().length]; + + // The demands per resource type + private final double[] resourcesDemand = new double[ResourceType.values().length]; + + // The remaining work of the current fragment per resource type (depends on the scaling policy) + private final double[] remainingWork = new double[ResourceType.values().length]; - private final double[] resourcesSupplied = - new double[ResourceType.values().length]; // the currently supplied resources - private final double[] resourcesDemand = new double[ResourceType.values().length]; // The demands per resource type - private final double[] remainingWork = - new double[ResourceType.values().length]; // The duration of the fragment at the demanded speeds - private double totalRemainingWork = - 0.0; // The total remaining work of the fragment across all resources, used to determine the end of the + // The remaining time for each resource type + private final long[] remainingTime = new long[ResourceType.values().length]; + + // Finished resources for the current fragment (Only relevant when multiple resource types are used) + private final boolean[] resourceFinished = new boolean[ResourceType.values().length]; + + // The total remaining work of the fragment across all resources, used to determine the end of the // fragment - private final boolean[] workloadFinished = - new boolean[ResourceType.values().length]; // The workload finished for each resource type + private double totalRemainingWork = 0.0; private final long checkpointDuration; private final TraceWorkload snapshot; @@ -96,6 +103,14 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { return 0; } + public long getFailureDelay() { + return this.snapshot.failureDelay; + } + + public long getCheckpointDelay() { + return this.snapshot.checkpointDelay; + } + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Constructors //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -145,16 +160,24 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { // Fragment related functionality //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - public boolean isWorkloadFinished() { + /** + * Check if all resources have finished their work for the current fragment + */ + public boolean getAllResourcesFinished() { for (ResourceType resourceType : this.usedResourceTypes) { - if (!this.workloadFinished[resourceType.ordinal()]) { + if (!this.resourceFinished[resourceType.ordinal()]) { return false; } } return true; } - // Update the remaining work for all resources based on the time passed since last update + /** + * Use the ScalingPolicy, time since the last update, demands and supplied resources + * to update the remaining work for each resource + * + * @param passedTime Time passed since the last update in milliseconds + */ private void updateRemainingWork(long passedTime) { for (ResourceType resourceType : this.usedResourceTypes) { // The amount of work done since last update @@ -163,60 +186,68 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { this.resourcesSupplied[resourceType.ordinal()], passedTime); - // TODO: maybe remove Math.max, as as we are already checking for <= 0 this.remainingWork[resourceType.ordinal()] = Math.max(0, this.remainingWork[resourceType.ordinal()] - finishedWork); this.totalRemainingWork -= finishedWork; if (this.remainingWork[resourceType.ordinal()] <= 0) { - this.workloadFinished[resourceType.ordinal()] = true; + this.resourceFinished[resourceType.ordinal()] = true; } } } - // Determine the next update time based on the remaining work and supplied resources - private long getNextUpdateTime(long now) { - long timeUntilNextUpdate = Long.MIN_VALUE; - + /** + * Update the remaining time for each resource using the ScalingPolicy, remaining work and supplied resources + */ + private void updateRemainingTime() { for (ResourceType resourceType : this.usedResourceTypes) { - // The amount of time required to finish the fragment at this speed - long remainingDuration = this.scalingPolicy.getRemainingDuration( + this.remainingTime[resourceType.ordinal()] = this.scalingPolicy.getRemainingDuration( this.resourcesDemand[resourceType.ordinal()], this.resourcesSupplied[resourceType.ordinal()], this.remainingWork[resourceType.ordinal()]); + } + } - if ((int) remainingDuration == 0) { - // if resource not initialized, then nothing happens - if (this.remainingWork[resourceType.ordinal()] >= 0.0) { - this.totalRemainingWork -= this.remainingWork[resourceType.ordinal()]; - } - this.remainingWork[resourceType.ordinal()] = 0.0; - this.workloadFinished[resourceType.ordinal()] = true; - } - - // The next update should happen when the fastest resource is done, so that it is no longer tracked when - // unused - if (remainingDuration > 0 - && (timeUntilNextUpdate == Long.MIN_VALUE || remainingDuration < timeUntilNextUpdate)) { - timeUntilNextUpdate = remainingDuration; - } + /** + * Get the next update time based on the remaining time of each resource + * The next update time is when the fastest resource that is not yet finished will finish + * + * @param now Current time in milliseconds + * @return The next update time in milliseconds + */ + private long getNextUpdateTime(long now) { + if (this.getAllResourcesFinished()) { + return now; } - return timeUntilNextUpdate == Long.MAX_VALUE ? Long.MAX_VALUE : now + timeUntilNextUpdate; - } + long timeUntilNextUpdate = Long.MAX_VALUE; - private void pushNewDemands() { for (ResourceType resourceType : this.usedResourceTypes) { - if (this.machineResourceEdges[resourceType.ordinal()] != null) { - this.pushOutgoingDemand( - this.machineResourceEdges[resourceType.ordinal()], - this.resourcesDemand[resourceType.ordinal()], - resourceType); + long remainingTime = this.remainingTime[resourceType.ordinal()]; + + // The next update should happen when the fastest resource is done + if (!this.resourceFinished[resourceType.ordinal()] && remainingTime < timeUntilNextUpdate) { + timeUntilNextUpdate = remainingTime; } } + + return timeUntilNextUpdate == Long.MAX_VALUE ? Long.MAX_VALUE : now + timeUntilNextUpdate; } + /** + * Handle an update event for this workload + *

+ * There are three possible scenarios: + *

    + *
  1. The fragment is completed across all resources: start the next fragment and call onUpdate again
  2. + *
  3. The fragment is not yet completed: push new demands and update remaining time
  4. + *
  5. The workload is completed: stop the workload
  6. + *
+ * + * @param now The virtual timestamp in milliseconds at which the update is occurring. + * @return The next update time in milliseconds. + */ @Override public long onUpdate(long now) { long passedTime = getPassedTime(now); @@ -225,28 +256,27 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { this.updateRemainingWork(passedTime); // If this.totalRemainingWork <= 0, the fragment has been completed across all resources - if ((int) this.totalRemainingWork <= 0 && this.isWorkloadFinished()) { + if ((int) this.totalRemainingWork <= 0) { this.startNextFragment(); if (this.nodeState == NodeState.CLOSING || this.nodeState == NodeState.CLOSED) { return Long.MAX_VALUE; } - return getNextUpdateTime(this.startOfFragment); + return this.onUpdate(now); } this.pushNewDemands(); + this.updateRemainingTime(); - long nextUpdate = getNextUpdateTime(this.startOfFragment); - - // if for all resources the remaining work is 0, then invalidate the workload, to reschedule the next fragment - if (nextUpdate == now + Long.MIN_VALUE) { - this.invalidate(); - return Long.MAX_VALUE; - } - return nextUpdate; + return getNextUpdateTime(this.startOfFragment); } + /** + * Get the next fragment to be executed + * + * @return The next TraceFragment or null if there are no more fragments + */ public TraceFragment getNextFragment() { if (this.remainingFragments.isEmpty()) { return null; @@ -257,8 +287,12 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { return this.currentFragment; } + /** + * Start the next fragment by resetting the remaining work and pushing new demands to the VM + * + * If no more fragments are left, stopWorkload is called. + */ private void startNextFragment() { - TraceFragment nextFragment = this.getNextFragment(); if (nextFragment == null) { this.stopWorkload(); @@ -274,7 +308,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { this.remainingWork[resourceType.ordinal()] = this.scalingPolicy.getRemainingWork(demand, nextFragment.duration()); this.totalRemainingWork += this.remainingWork[resourceType.ordinal()]; - this.workloadFinished[resourceType.ordinal()] = false; + this.resourceFinished[resourceType.ordinal()] = false; if (this.machineResourceEdges[resourceType.ordinal()] != null) { this.pushOutgoingDemand(this.machineResourceEdges[resourceType.ordinal()], demand, resourceType); @@ -282,20 +316,46 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { } } + /** + * Stop the workload and clean up resources. All connected edges are notified. + *

+ * stopWorkload can be called in two scenarios: + *

    + *
  1. The workload has completed successfully
  2. + *
  3. The workload is stopped because the task is paused of the host has failed + *
+ *

+ * On failure, the wasted time is calculated for bookkeeping purposes. + */ @Override public void stopWorkload() { + + // If the workload is stopped due to an error or failure, calculate the wasted time for bookkeeping. + if (this.totalRemainingWork > 0.0 || !this.remainingFragments.isEmpty()) { + // Failure + + this.updateRemainingWork(this.clock.millis() - this.startOfFragment); + + for (int i = 0; i < this.fragmentIndex; i++) { + this.snapshot.failureDelay += + this.snapshot.getFragments().get(i).duration(); + } + this.snapshot.failureDelay -= (long) this.totalRemainingWork; + } + + // The workload has already been stopped if (areAllEdgesNull()) { return; } // TODO: Maybe move this to the end - // Currently stopWorkload is called twice this.closeNode(); for (ResourceType resourceType : this.usedResourceTypes) { this.machineResourceEdges[resourceType.ordinal()] = null; - this.workloadFinished[resourceType.ordinal()] = true; + this.resourceFinished[resourceType.ordinal()] = true; } + this.remainingFragments = null; this.currentFragment = null; } @@ -306,63 +366,55 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { /** * SimTraceWorkload does not make a checkpoint, checkpointing is handled by SimChainWorkload - * TODO: Maybe add checkpoint models for SimTraceWorkload */ @Override public void createCheckpointModel() {} /** - * Create a new snapshot based on the current status of the workload. - * @param now Moment on which the snapshot is made in milliseconds + * Create a snapshot of the current state of the workload + *

+ * First calculate the remaining work of the current fragment based on the time passed since the last update. + * Then, Create a new fragment based on the current fragment and the remaining work. + * Finally, update the snapshot and the remaining fragments. + *

+ * The snapshot contains all remaining fragments, with the current fragment adjusted to only include the remaining + * work. + * + * @param now Current time in milliseconds */ public void makeSnapshot(long now) { - - // Check if fragments is empty - - // Get remaining time of current fragment long passedTime = getPassedTime(now); + this.startOfFragment = now; - // The amount of work done since last update - for (ResourceType resourceType : this.usedResourceTypes) { - double finishedWork = this.scalingPolicy.getFinishedWork( - this.resourcesDemand[resourceType.ordinal()], - this.resourcesSupplied[resourceType.ordinal()], - passedTime); - this.remainingWork[resourceType.ordinal()] = this.remainingWork[resourceType.ordinal()] - finishedWork; - this.totalRemainingWork -= finishedWork; - } - - long remainingDuration = 0; - for (ResourceType resourceType : this.usedResourceTypes) { + this.updateRemainingWork(passedTime); + this.updateRemainingTime(); - // The amount of time required to finish the fragment at this speed - remainingDuration = Math.max( - remainingDuration, - this.scalingPolicy.getRemainingDuration( - this.resourcesDemand[resourceType.ordinal()], - this.resourcesSupplied[resourceType.ordinal()], - this.remainingWork[resourceType.ordinal()])); - } + long remainingDuration = Arrays.stream(this.remainingTime).max().orElseThrow(); // If this is the end of the Task, don't make a snapshot if (this.currentFragment == null || (remainingDuration <= 0 && remainingFragments.isEmpty())) { return; } - // Create a new fragment based on the current fragment and remaining duration - TraceFragment newFragment = new TraceFragment( - remainingDuration, - currentFragment.cpuUsage(), - currentFragment.gpuUsage(), - currentFragment.gpuMemoryUsage()); - - // Alter the snapshot by removing finished fragments + // Remove all fragments up to and including the current fragment from the snapshot + // These fragments will have to be re-executed after a failure this.snapshot.removeFragments(this.fragmentIndex); - this.snapshot.addFirst(newFragment); - this.remainingFragments.addFirst(newFragment); + // Create a new fragment with the same resource usage as the current fragment, + // but with the remaining duration. Put the adjusted fragment at the front of the + // remaining fragments and snapshot + if (remainingDuration > 0) { + TraceFragment adjustedFragment = new TraceFragment( + remainingDuration, + currentFragment.cpuUsage(), + currentFragment.gpuUsage(), + currentFragment.gpuMemoryUsage()); + + this.snapshot.addFirst(adjustedFragment); + this.remainingFragments.addFirst(adjustedFragment); + } - // Create and add a fragment for processing the snapshot process + // Create a fragment for processing the snapshot process and add it to the front of the remaining fragments TraceFragment snapshotFragment = new TraceFragment( this.checkpointDuration, this.snapshot.getMaxCpuDemand(), @@ -370,11 +422,12 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { this.snapshot.getMaxGpuMemoryDemand()); this.remainingFragments.addFirst(snapshotFragment); + // Add the checkpoint duration for bookkeeping + this.snapshot.checkpointDelay += this.checkpointDuration; + this.fragmentIndex = -1; startNextFragment(); - this.startOfFragment = now; - this.invalidate(); } @@ -424,6 +477,8 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { long passedTime = getPassedTime(now); this.updateRemainingWork(passedTime); + this.updateRemainingTime(); + long next_deadline = this.getNextUpdateTime(now); // Remove stage from the timer queue @@ -431,6 +486,20 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { this.engine.scheduleDelayedInContext(this); } + /** + * Push new demands for all resource types to the Virtual Machine + */ + private void pushNewDemands() { + for (ResourceType resourceType : this.usedResourceTypes) { + if (this.machineResourceEdges[resourceType.ordinal()] != null) { + this.pushOutgoingDemand( + this.machineResourceEdges[resourceType.ordinal()], + this.resourcesDemand[resourceType.ordinal()], + resourceType); + } + } + } + /** * Push a new demand to the Virtual Machine * @@ -505,6 +574,11 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { this.stopWorkload(); } + /** + * Get all connected edges to this workload + * + * @return A map of connected edges categorized by their node type. + */ @Override public Map> getConnectedEdges() { Map> connectedEdges = new HashMap<>(); @@ -517,9 +591,6 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { return connectedEdges; } - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Util Methods - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// private boolean areAllEdgesNull() { for (FlowEdge edge : this.machineResourceEdges) { if (edge != null) { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java index 53ce9f31..161a8041 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java @@ -46,6 +46,9 @@ public class TraceWorkload implements Workload { private final int taskId; private final ResourceType[] resourceTypes; + public long checkpointDelay = 0; + public long failureDelay = 0; + public ScalingPolicy getScalingPolicy() { return scalingPolicy; } @@ -116,10 +119,19 @@ public class TraceWorkload implements Workload { return taskId; } + public long failureDelay() { + return failureDelay; + } + + public long checkpointDelay() { + return checkpointDelay; + } + public void removeFragments(int numberOfFragments) { if (numberOfFragments <= 0) { return; } + this.fragments.subList(0, numberOfFragments).clear(); } -- cgit v1.2.3