diff options
Diffstat (limited to 'opendc-simulator/opendc-simulator-compute/src')
8 files changed, 286 insertions, 62 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java index 2919fc3a..ebfcc552 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java @@ -55,11 +55,11 @@ public abstract class SimWorkload extends FlowNode implements FlowConsumer { public abstract Workload getSnapshot(); - abstract void createCheckpointModel(); + public abstract void createCheckpointModel(); - abstract long getCheckpointInterval(); + public abstract long getCheckpointInterval(); - abstract long getCheckpointDuration(); + public abstract long getCheckpointDuration(); - abstract double getCheckpointIntervalScaling(); + public abstract double getCheckpointIntervalScaling(); } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java index 9b12b1e3..93733268 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java @@ -20,9 +20,12 @@ * SOFTWARE. */ -package org.opendc.simulator.compute.workload; +package org.opendc.simulator.compute.workload.trace; import java.util.LinkedList; +import org.opendc.simulator.compute.workload.SimWorkload; +import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling; +import org.opendc.simulator.compute.workload.trace.scaling.ScalingPolicy; import org.opendc.simulator.engine.graph.FlowConsumer; import org.opendc.simulator.engine.graph.FlowEdge; import org.opendc.simulator.engine.graph.FlowGraph; @@ -37,13 +40,18 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { private long startOfFragment; private FlowEdge machineEdge; - private double currentDemand; - private double currentSupply; + + private double cpuFreqDemand = 0.0; // The Cpu demanded by fragment + private double cpuFreqSupplied = 0.0; // The Cpu speed supplied + private double newCpuFreqSupplied = 0.0; // The Cpu speed supplied + private double remainingWork = 0.0; // The duration of the fragment at the demanded speed private long checkpointDuration; private TraceWorkload snapshot; + private ScalingPolicy scalingPolicy = new NoDelayScaling(); + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Basic Getters and Setters //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -57,27 +65,20 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { } @Override - long getCheckpointInterval() { + public long getCheckpointInterval() { return 0; } @Override - long getCheckpointDuration() { + public long getCheckpointDuration() { return 0; } @Override - double getCheckpointIntervalScaling() { + public double getCheckpointIntervalScaling() { return 0; } - public TraceFragment getNextFragment() { - this.currentFragment = this.remainingFragments.pop(); - this.fragmentIndex++; - - return this.currentFragment; - } - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Constructors //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -87,15 +88,12 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { this.snapshot = workload; this.checkpointDuration = workload.getCheckpointDuration(); + this.scalingPolicy = workload.getScalingPolicy(); this.remainingFragments = new LinkedList<>(workload.getFragments()); this.fragmentIndex = 0; final FlowGraph graph = ((FlowNode) supplier).getGraph(); graph.addEdge(this, supplier); - - this.currentFragment = this.getNextFragment(); - pushOutgoingDemand(machineEdge, this.currentFragment.cpuUsage()); - this.startOfFragment = now; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -105,36 +103,50 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { @Override public long onUpdate(long now) { long passedTime = getPassedTime(now); - long duration = this.currentFragment.duration(); + this.startOfFragment = now; - // The current Fragment has not yet been finished, continue - if (passedTime < duration) { - return now + (duration - passedTime); + // The amount of work done since last update + double finishedWork = this.scalingPolicy.getFinishedWork(this.cpuFreqDemand, this.cpuFreqSupplied, passedTime); + + this.remainingWork -= finishedWork; + + // If this.remainingWork <= 0, the fragment has been completed + if (this.remainingWork <= 0) { + this.startNextFragment(); + + this.invalidate(); + return Long.MAX_VALUE; } - // Loop through fragments until the passed time is filled. - // We need a while loop to account for skipping of fragments. - while (passedTime >= duration) { - if (this.remainingFragments.isEmpty()) { - this.stopWorkload(); - return Long.MAX_VALUE; - } + this.cpuFreqSupplied = this.newCpuFreqSupplied; - passedTime = passedTime - duration; + // The amount of time required to finish the fragment at this speed + long remainingDuration = this.scalingPolicy.getRemainingDuration( + this.cpuFreqDemand, this.newCpuFreqSupplied, this.remainingWork); - // get next Fragment - currentFragment = this.getNextFragment(); - duration = currentFragment.duration(); + return now + remainingDuration; + } + + public TraceFragment getNextFragment() { + if (this.remainingFragments.isEmpty()) { + return null; } + this.currentFragment = this.remainingFragments.pop(); + this.fragmentIndex++; - // start new fragment - this.startOfFragment = now - passedTime; + return this.currentFragment; + } - // Change the cpu Usage to the new Fragment - pushOutgoingDemand(machineEdge, this.currentFragment.cpuUsage()); + private void startNextFragment() { - // Return the time when the current fragment will complete - return this.startOfFragment + duration; + TraceFragment nextFragment = this.getNextFragment(); + if (nextFragment == null) { + this.stopWorkload(); + return; + } + double demand = nextFragment.cpuUsage(); + this.remainingWork = this.scalingPolicy.getRemainingWork(demand, nextFragment.duration()); + this.pushOutgoingDemand(this.machineEdge, demand); } @Override @@ -159,7 +171,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { * TODO: Maybe add checkpoint models for SimTraceWorkload */ @Override - void createCheckpointModel() {} + public void createCheckpointModel() {} /** * Create a new snapshot based on the current status of the workload. @@ -171,7 +183,15 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { // Get remaining time of current fragment long passedTime = getPassedTime(now); - long remainingTime = currentFragment.duration() - passedTime; + + // The amount of work done since last update + double finishedWork = this.scalingPolicy.getFinishedWork(this.cpuFreqDemand, this.cpuFreqSupplied, passedTime); + + this.remainingWork -= finishedWork; + + // The amount of time required to finish the fragment at this speed + long remainingTime = + this.scalingPolicy.getRemainingDuration(this.cpuFreqDemand, this.cpuFreqDemand, this.remainingWork); // If this is the end of the Task, don't make a snapshot if (remainingTime <= 0 && remainingFragments.isEmpty()) { @@ -189,13 +209,13 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { this.remainingFragments.addFirst(newFragment); // Create and add a fragment for processing the snapshot process - // TODO: improve the implementation of cpuUsage and coreCount - TraceFragment snapshotFragment = new TraceFragment(this.checkpointDuration, 123456, 1); + TraceFragment snapshotFragment = new TraceFragment( + this.checkpointDuration, this.snapshot.getMaxCpuDemand(), this.snapshot.getMaxCoreCount()); this.remainingFragments.addFirst(snapshotFragment); this.fragmentIndex = -1; - this.currentFragment = getNextFragment(); - pushOutgoingDemand(this.machineEdge, this.currentFragment.cpuUsage()); + startNextFragment(); + this.startOfFragment = now; this.invalidate(); @@ -213,11 +233,14 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { */ @Override public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) { - if (newSupply == this.currentSupply) { + if (newSupply == this.cpuFreqSupplied) { return; } - this.currentSupply = newSupply; + this.cpuFreqSupplied = this.newCpuFreqSupplied; + this.newCpuFreqSupplied = newSupply; + + this.invalidate(); } /** @@ -228,11 +251,11 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { */ @Override public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) { - if (newDemand == this.currentDemand) { + if (newDemand == this.cpuFreqDemand) { return; } - this.currentDemand = newDemand; + this.cpuFreqDemand = newDemand; this.machineEdge.pushDemand(newDemand); } @@ -257,6 +280,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { if (this.machineEdge == null) { return; } + this.stopWorkload(); } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceFragment.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceFragment.java index 550c2135..a09206a1 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceFragment.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceFragment.java @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.simulator.compute.workload; +package org.opendc.simulator.compute.workload.trace; public record TraceFragment(long duration, double cpuUsage, int coreCount) { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java index 7f82ab71..47292a7b 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java @@ -20,10 +20,15 @@ * SOFTWARE. */ -package org.opendc.simulator.compute.workload; +package org.opendc.simulator.compute.workload.trace; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; +import org.opendc.simulator.compute.workload.SimWorkload; +import org.opendc.simulator.compute.workload.Workload; +import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling; +import org.opendc.simulator.compute.workload.trace.scaling.ScalingPolicy; import org.opendc.simulator.engine.graph.FlowSupplier; public class TraceWorkload implements Workload { @@ -31,20 +36,40 @@ public class TraceWorkload implements Workload { private final long checkpointInterval; private final long checkpointDuration; private final double checkpointIntervalScaling; + private final double maxCpuDemand; + private final int maxCoreCount; + + public ScalingPolicy getScalingPolicy() { + return scalingPolicy; + } + + private final ScalingPolicy scalingPolicy; public TraceWorkload( ArrayList<TraceFragment> fragments, long checkpointInterval, long checkpointDuration, - double checkpointIntervalScaling) { + double checkpointIntervalScaling, + ScalingPolicy scalingPolicy) { this.fragments = fragments; this.checkpointInterval = checkpointInterval; this.checkpointDuration = checkpointDuration; this.checkpointIntervalScaling = checkpointIntervalScaling; + this.scalingPolicy = scalingPolicy; + + // TODO: remove if we decide not to use it. + this.maxCpuDemand = fragments.stream() + .max(Comparator.comparing(TraceFragment::cpuUsage)) + .get() + .cpuUsage(); + this.maxCoreCount = fragments.stream() + .max(Comparator.comparing(TraceFragment::coreCount)) + .get() + .coreCount(); } public TraceWorkload(ArrayList<TraceFragment> fragments) { - this(fragments, 0L, 0L, 1.0); + this(fragments, 0L, 0L, 1.0, new NoDelayScaling()); } public ArrayList<TraceFragment> getFragments() { @@ -66,6 +91,14 @@ public class TraceWorkload implements Workload { return checkpointIntervalScaling; } + public int getMaxCoreCount() { + return maxCoreCount; + } + + public double getMaxCpuDemand() { + return maxCpuDemand; + } + public void removeFragments(int numberOfFragments) { if (numberOfFragments <= 0) { return; @@ -83,11 +116,15 @@ public class TraceWorkload implements Workload { } public static Builder builder() { - return builder(0L, 0L, 0.0); + return builder(0L, 0L, 0.0, new NoDelayScaling()); } - public static Builder builder(long checkpointInterval, long checkpointDuration, double checkpointIntervalScaling) { - return new Builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling); + public static Builder builder( + long checkpointInterval, + long checkpointDuration, + double checkpointIntervalScaling, + ScalingPolicy scalingPolicy) { + return new Builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling, scalingPolicy); } /** @@ -125,15 +162,21 @@ public class TraceWorkload implements Workload { private final long checkpointInterval; private final long checkpointDuration; private final double checkpointIntervalScaling; + private final ScalingPolicy scalingPolicy; /** * Construct a new {@link Builder} instance. */ - private Builder(long checkpointInterval, long checkpointDuration, double checkpointIntervalScaling) { + private Builder( + long checkpointInterval, + long checkpointDuration, + double checkpointIntervalScaling, + ScalingPolicy scalingPolicy) { this.fragments = new ArrayList<>(); this.checkpointInterval = checkpointInterval; this.checkpointDuration = checkpointDuration; this.checkpointIntervalScaling = checkpointIntervalScaling; + this.scalingPolicy = scalingPolicy; } /** @@ -152,7 +195,11 @@ public class TraceWorkload implements Workload { */ public TraceWorkload build() { return new TraceWorkload( - this.fragments, this.checkpointInterval, this.checkpointDuration, this.checkpointIntervalScaling); + this.fragments, + this.checkpointInterval, + this.checkpointDuration, + this.checkpointIntervalScaling, + this.scalingPolicy); } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/NoDelayScaling.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/NoDelayScaling.java new file mode 100644 index 00000000..4230bb55 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/NoDelayScaling.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload.trace.scaling; + +/** + * The NoDelay scaling policy states that there will be no delay + * when less CPU can be provided than needed. + * + * This could be used in situations where the data is streamed. + * This will also result in the same behaviour as older OpenDC. + */ +public class NoDelayScaling implements ScalingPolicy { + @Override + public double getFinishedWork(double cpuFreqDemand, double cpuFreqSupplied, long passedTime) { + return cpuFreqDemand * passedTime; + } + + @Override + public long getRemainingDuration(double cpuFreqDemand, double cpuFreqSupplied, double remainingWork) { + return (long) (remainingWork / cpuFreqDemand); + } + + @Override + public double getRemainingWork(double cpuFreqDemand, long duration) { + return cpuFreqDemand * duration; + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/PerfectScaling.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/PerfectScaling.java new file mode 100644 index 00000000..7eae70e6 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/PerfectScaling.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload.trace.scaling; + +/** + * PerfectScaling scales the workload duration perfectly + * based on the CPU capacity. + * + * This means that if a fragment has a duration of 10 min at 4000 mHz, + * it will take 20 min and 2000 mHz. + */ +public class PerfectScaling implements ScalingPolicy { + @Override + public double getFinishedWork(double cpuFreqDemand, double cpuFreqSupplied, long passedTime) { + return cpuFreqSupplied * passedTime; + } + + @Override + public long getRemainingDuration(double cpuFreqDemand, double cpuFreqSupplied, double remainingWork) { + return (long) (remainingWork / cpuFreqSupplied); + } + + @Override + public double getRemainingWork(double cpuFreqDemand, long duration) { + return cpuFreqDemand * duration; + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/ScalingPolicy.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/ScalingPolicy.java new file mode 100644 index 00000000..a0f473ba --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/ScalingPolicy.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.compute.workload.trace.scaling; + +/** + * Interface for the scaling policy. + * A scaling decides how a TaskFragment should scale when it is not getting the demanded capacity + */ +public interface ScalingPolicy { + + /** + * Calculate how much work was finished based on the demanded and supplied cpu + * + * @param cpuFreqDemand + * @param cpuFreqSupplied + * @param passedTime + * @return + */ + double getFinishedWork(double cpuFreqDemand, double cpuFreqSupplied, long passedTime); + + /** + * Calculate the remaining duration of this fragment based on the demanded and supplied cpu + * + * @param cpuFreqDemand + * @param cpuFreqSupplied + * @param remainingWork + * @return + */ + long getRemainingDuration(double cpuFreqDemand, double cpuFreqSupplied, double remainingWork); + + /** + * Calculate how much work is remaining based on the demanded and supplied cpu + * + * @param cpuFreqDemand + * @param duration + * @return + */ + double getRemainingWork(double cpuFreqDemand, long duration); +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt index ad69a3d6..49baaf48 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt @@ -24,7 +24,7 @@ package org.opendc.simulator.compute import kotlinx.coroutines.suspendCancellableCoroutine import org.opendc.simulator.compute.machine.SimMachine -import org.opendc.simulator.compute.workload.TraceWorkload +import org.opendc.simulator.compute.workload.trace.TraceWorkload import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException |
