summaryrefslogtreecommitdiff
path: root/opendc-simulator
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2025-01-24 13:54:59 +0100
committerGitHub <noreply@github.com>2025-01-24 13:54:59 +0100
commitbe9698483f8e7891b5c2d562eaeac9dd3edbf9d8 (patch)
tree60b27e2ff80f76c5aa7736ca64f2ae0580348930 /opendc-simulator
parentbb945c2fdd7b20898e3dfccbac7da2a427418216 (diff)
Added Fragment scaling (#296)
* Added maxCpuDemand to TraceWorkload, don't know if this will be needed so might remove later. Updated SimTraceWorkload to properly handle creating checkpoints Fixed a bug with the updatedConsumers in the FlowDistributor Implemented a first version of scaling the runtime of fragments. * small update * updated tests to reflect the changes in the checkpointing model * Updated the checkpointing tests to reflect the changes made * updated wrapper-validation-action * Applied spotless
Diffstat (limited to 'opendc-simulator')
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java8
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java (renamed from opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java)120
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceFragment.java (renamed from opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceFragment.java)2
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java (renamed from opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceWorkload.java)63
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/NoDelayScaling.java47
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/PerfectScaling.java47
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/ScalingPolicy.java59
-rw-r--r--opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java14
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java35
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java4
11 files changed, 317 insertions, 84 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java
index 2919fc3a..ebfcc552 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java
@@ -55,11 +55,11 @@ public abstract class SimWorkload extends FlowNode implements FlowConsumer {
public abstract Workload getSnapshot();
- abstract void createCheckpointModel();
+ public abstract void createCheckpointModel();
- abstract long getCheckpointInterval();
+ public abstract long getCheckpointInterval();
- abstract long getCheckpointDuration();
+ public abstract long getCheckpointDuration();
- abstract double getCheckpointIntervalScaling();
+ public abstract double getCheckpointIntervalScaling();
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java
index 9b12b1e3..93733268 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java
@@ -20,9 +20,12 @@
* SOFTWARE.
*/
-package org.opendc.simulator.compute.workload;
+package org.opendc.simulator.compute.workload.trace;
import java.util.LinkedList;
+import org.opendc.simulator.compute.workload.SimWorkload;
+import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling;
+import org.opendc.simulator.compute.workload.trace.scaling.ScalingPolicy;
import org.opendc.simulator.engine.graph.FlowConsumer;
import org.opendc.simulator.engine.graph.FlowEdge;
import org.opendc.simulator.engine.graph.FlowGraph;
@@ -37,13 +40,18 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
private long startOfFragment;
private FlowEdge machineEdge;
- private double currentDemand;
- private double currentSupply;
+
+ private double cpuFreqDemand = 0.0; // The Cpu demanded by fragment
+ private double cpuFreqSupplied = 0.0; // The Cpu speed supplied
+ private double newCpuFreqSupplied = 0.0; // The Cpu speed supplied
+ private double remainingWork = 0.0; // The duration of the fragment at the demanded speed
private long checkpointDuration;
private TraceWorkload snapshot;
+ private ScalingPolicy scalingPolicy = new NoDelayScaling();
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Basic Getters and Setters
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -57,27 +65,20 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
}
@Override
- long getCheckpointInterval() {
+ public long getCheckpointInterval() {
return 0;
}
@Override
- long getCheckpointDuration() {
+ public long getCheckpointDuration() {
return 0;
}
@Override
- double getCheckpointIntervalScaling() {
+ public double getCheckpointIntervalScaling() {
return 0;
}
- public TraceFragment getNextFragment() {
- this.currentFragment = this.remainingFragments.pop();
- this.fragmentIndex++;
-
- return this.currentFragment;
- }
-
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Constructors
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -87,15 +88,12 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
this.snapshot = workload;
this.checkpointDuration = workload.getCheckpointDuration();
+ this.scalingPolicy = workload.getScalingPolicy();
this.remainingFragments = new LinkedList<>(workload.getFragments());
this.fragmentIndex = 0;
final FlowGraph graph = ((FlowNode) supplier).getGraph();
graph.addEdge(this, supplier);
-
- this.currentFragment = this.getNextFragment();
- pushOutgoingDemand(machineEdge, this.currentFragment.cpuUsage());
- this.startOfFragment = now;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -105,36 +103,50 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
@Override
public long onUpdate(long now) {
long passedTime = getPassedTime(now);
- long duration = this.currentFragment.duration();
+ this.startOfFragment = now;
- // The current Fragment has not yet been finished, continue
- if (passedTime < duration) {
- return now + (duration - passedTime);
+ // The amount of work done since last update
+ double finishedWork = this.scalingPolicy.getFinishedWork(this.cpuFreqDemand, this.cpuFreqSupplied, passedTime);
+
+ this.remainingWork -= finishedWork;
+
+ // If this.remainingWork <= 0, the fragment has been completed
+ if (this.remainingWork <= 0) {
+ this.startNextFragment();
+
+ this.invalidate();
+ return Long.MAX_VALUE;
}
- // Loop through fragments until the passed time is filled.
- // We need a while loop to account for skipping of fragments.
- while (passedTime >= duration) {
- if (this.remainingFragments.isEmpty()) {
- this.stopWorkload();
- return Long.MAX_VALUE;
- }
+ this.cpuFreqSupplied = this.newCpuFreqSupplied;
- passedTime = passedTime - duration;
+ // The amount of time required to finish the fragment at this speed
+ long remainingDuration = this.scalingPolicy.getRemainingDuration(
+ this.cpuFreqDemand, this.newCpuFreqSupplied, this.remainingWork);
- // get next Fragment
- currentFragment = this.getNextFragment();
- duration = currentFragment.duration();
+ return now + remainingDuration;
+ }
+
+ public TraceFragment getNextFragment() {
+ if (this.remainingFragments.isEmpty()) {
+ return null;
}
+ this.currentFragment = this.remainingFragments.pop();
+ this.fragmentIndex++;
- // start new fragment
- this.startOfFragment = now - passedTime;
+ return this.currentFragment;
+ }
- // Change the cpu Usage to the new Fragment
- pushOutgoingDemand(machineEdge, this.currentFragment.cpuUsage());
+ private void startNextFragment() {
- // Return the time when the current fragment will complete
- return this.startOfFragment + duration;
+ TraceFragment nextFragment = this.getNextFragment();
+ if (nextFragment == null) {
+ this.stopWorkload();
+ return;
+ }
+ double demand = nextFragment.cpuUsage();
+ this.remainingWork = this.scalingPolicy.getRemainingWork(demand, nextFragment.duration());
+ this.pushOutgoingDemand(this.machineEdge, demand);
}
@Override
@@ -159,7 +171,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
* TODO: Maybe add checkpoint models for SimTraceWorkload
*/
@Override
- void createCheckpointModel() {}
+ public void createCheckpointModel() {}
/**
* Create a new snapshot based on the current status of the workload.
@@ -171,7 +183,15 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
// Get remaining time of current fragment
long passedTime = getPassedTime(now);
- long remainingTime = currentFragment.duration() - passedTime;
+
+ // The amount of work done since last update
+ double finishedWork = this.scalingPolicy.getFinishedWork(this.cpuFreqDemand, this.cpuFreqSupplied, passedTime);
+
+ this.remainingWork -= finishedWork;
+
+ // The amount of time required to finish the fragment at this speed
+ long remainingTime =
+ this.scalingPolicy.getRemainingDuration(this.cpuFreqDemand, this.cpuFreqDemand, this.remainingWork);
// If this is the end of the Task, don't make a snapshot
if (remainingTime <= 0 && remainingFragments.isEmpty()) {
@@ -189,13 +209,13 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
this.remainingFragments.addFirst(newFragment);
// Create and add a fragment for processing the snapshot process
- // TODO: improve the implementation of cpuUsage and coreCount
- TraceFragment snapshotFragment = new TraceFragment(this.checkpointDuration, 123456, 1);
+ TraceFragment snapshotFragment = new TraceFragment(
+ this.checkpointDuration, this.snapshot.getMaxCpuDemand(), this.snapshot.getMaxCoreCount());
this.remainingFragments.addFirst(snapshotFragment);
this.fragmentIndex = -1;
- this.currentFragment = getNextFragment();
- pushOutgoingDemand(this.machineEdge, this.currentFragment.cpuUsage());
+ startNextFragment();
+
this.startOfFragment = now;
this.invalidate();
@@ -213,11 +233,14 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
*/
@Override
public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) {
- if (newSupply == this.currentSupply) {
+ if (newSupply == this.cpuFreqSupplied) {
return;
}
- this.currentSupply = newSupply;
+ this.cpuFreqSupplied = this.newCpuFreqSupplied;
+ this.newCpuFreqSupplied = newSupply;
+
+ this.invalidate();
}
/**
@@ -228,11 +251,11 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
*/
@Override
public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) {
- if (newDemand == this.currentDemand) {
+ if (newDemand == this.cpuFreqDemand) {
return;
}
- this.currentDemand = newDemand;
+ this.cpuFreqDemand = newDemand;
this.machineEdge.pushDemand(newDemand);
}
@@ -257,6 +280,7 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer {
if (this.machineEdge == null) {
return;
}
+
this.stopWorkload();
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceFragment.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceFragment.java
index 550c2135..a09206a1 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceFragment.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceFragment.java
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.simulator.compute.workload;
+package org.opendc.simulator.compute.workload.trace;
public record TraceFragment(long duration, double cpuUsage, int coreCount) {
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java
index 7f82ab71..47292a7b 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceWorkload.java
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java
@@ -20,10 +20,15 @@
* SOFTWARE.
*/
-package org.opendc.simulator.compute.workload;
+package org.opendc.simulator.compute.workload.trace;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
+import org.opendc.simulator.compute.workload.SimWorkload;
+import org.opendc.simulator.compute.workload.Workload;
+import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling;
+import org.opendc.simulator.compute.workload.trace.scaling.ScalingPolicy;
import org.opendc.simulator.engine.graph.FlowSupplier;
public class TraceWorkload implements Workload {
@@ -31,20 +36,40 @@ public class TraceWorkload implements Workload {
private final long checkpointInterval;
private final long checkpointDuration;
private final double checkpointIntervalScaling;
+ private final double maxCpuDemand;
+ private final int maxCoreCount;
+
+ public ScalingPolicy getScalingPolicy() {
+ return scalingPolicy;
+ }
+
+ private final ScalingPolicy scalingPolicy;
public TraceWorkload(
ArrayList<TraceFragment> fragments,
long checkpointInterval,
long checkpointDuration,
- double checkpointIntervalScaling) {
+ double checkpointIntervalScaling,
+ ScalingPolicy scalingPolicy) {
this.fragments = fragments;
this.checkpointInterval = checkpointInterval;
this.checkpointDuration = checkpointDuration;
this.checkpointIntervalScaling = checkpointIntervalScaling;
+ this.scalingPolicy = scalingPolicy;
+
+ // TODO: remove if we decide not to use it.
+ this.maxCpuDemand = fragments.stream()
+ .max(Comparator.comparing(TraceFragment::cpuUsage))
+ .get()
+ .cpuUsage();
+ this.maxCoreCount = fragments.stream()
+ .max(Comparator.comparing(TraceFragment::coreCount))
+ .get()
+ .coreCount();
}
public TraceWorkload(ArrayList<TraceFragment> fragments) {
- this(fragments, 0L, 0L, 1.0);
+ this(fragments, 0L, 0L, 1.0, new NoDelayScaling());
}
public ArrayList<TraceFragment> getFragments() {
@@ -66,6 +91,14 @@ public class TraceWorkload implements Workload {
return checkpointIntervalScaling;
}
+ public int getMaxCoreCount() {
+ return maxCoreCount;
+ }
+
+ public double getMaxCpuDemand() {
+ return maxCpuDemand;
+ }
+
public void removeFragments(int numberOfFragments) {
if (numberOfFragments <= 0) {
return;
@@ -83,11 +116,15 @@ public class TraceWorkload implements Workload {
}
public static Builder builder() {
- return builder(0L, 0L, 0.0);
+ return builder(0L, 0L, 0.0, new NoDelayScaling());
}
- public static Builder builder(long checkpointInterval, long checkpointDuration, double checkpointIntervalScaling) {
- return new Builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling);
+ public static Builder builder(
+ long checkpointInterval,
+ long checkpointDuration,
+ double checkpointIntervalScaling,
+ ScalingPolicy scalingPolicy) {
+ return new Builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling, scalingPolicy);
}
/**
@@ -125,15 +162,21 @@ public class TraceWorkload implements Workload {
private final long checkpointInterval;
private final long checkpointDuration;
private final double checkpointIntervalScaling;
+ private final ScalingPolicy scalingPolicy;
/**
* Construct a new {@link Builder} instance.
*/
- private Builder(long checkpointInterval, long checkpointDuration, double checkpointIntervalScaling) {
+ private Builder(
+ long checkpointInterval,
+ long checkpointDuration,
+ double checkpointIntervalScaling,
+ ScalingPolicy scalingPolicy) {
this.fragments = new ArrayList<>();
this.checkpointInterval = checkpointInterval;
this.checkpointDuration = checkpointDuration;
this.checkpointIntervalScaling = checkpointIntervalScaling;
+ this.scalingPolicy = scalingPolicy;
}
/**
@@ -152,7 +195,11 @@ public class TraceWorkload implements Workload {
*/
public TraceWorkload build() {
return new TraceWorkload(
- this.fragments, this.checkpointInterval, this.checkpointDuration, this.checkpointIntervalScaling);
+ this.fragments,
+ this.checkpointInterval,
+ this.checkpointDuration,
+ this.checkpointIntervalScaling,
+ this.scalingPolicy);
}
}
}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/NoDelayScaling.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/NoDelayScaling.java
new file mode 100644
index 00000000..4230bb55
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/NoDelayScaling.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2025 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.workload.trace.scaling;
+
+/**
+ * The NoDelay scaling policy states that there will be no delay
+ * when less CPU can be provided than needed.
+ *
+ * This could be used in situations where the data is streamed.
+ * This will also result in the same behaviour as older OpenDC.
+ */
+public class NoDelayScaling implements ScalingPolicy {
+ @Override
+ public double getFinishedWork(double cpuFreqDemand, double cpuFreqSupplied, long passedTime) {
+ return cpuFreqDemand * passedTime;
+ }
+
+ @Override
+ public long getRemainingDuration(double cpuFreqDemand, double cpuFreqSupplied, double remainingWork) {
+ return (long) (remainingWork / cpuFreqDemand);
+ }
+
+ @Override
+ public double getRemainingWork(double cpuFreqDemand, long duration) {
+ return cpuFreqDemand * duration;
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/PerfectScaling.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/PerfectScaling.java
new file mode 100644
index 00000000..7eae70e6
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/PerfectScaling.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2025 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.workload.trace.scaling;
+
+/**
+ * PerfectScaling scales the workload duration perfectly
+ * based on the CPU capacity.
+ *
+ * This means that if a fragment has a duration of 10 min at 4000 mHz,
+ * it will take 20 min and 2000 mHz.
+ */
+public class PerfectScaling implements ScalingPolicy {
+ @Override
+ public double getFinishedWork(double cpuFreqDemand, double cpuFreqSupplied, long passedTime) {
+ return cpuFreqSupplied * passedTime;
+ }
+
+ @Override
+ public long getRemainingDuration(double cpuFreqDemand, double cpuFreqSupplied, double remainingWork) {
+ return (long) (remainingWork / cpuFreqSupplied);
+ }
+
+ @Override
+ public double getRemainingWork(double cpuFreqDemand, long duration) {
+ return cpuFreqDemand * duration;
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/ScalingPolicy.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/ScalingPolicy.java
new file mode 100644
index 00000000..a0f473ba
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/ScalingPolicy.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2025 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.compute.workload.trace.scaling;
+
+/**
+ * Interface for the scaling policy.
+ * A scaling decides how a TaskFragment should scale when it is not getting the demanded capacity
+ */
+public interface ScalingPolicy {
+
+ /**
+ * Calculate how much work was finished based on the demanded and supplied cpu
+ *
+ * @param cpuFreqDemand
+ * @param cpuFreqSupplied
+ * @param passedTime
+ * @return
+ */
+ double getFinishedWork(double cpuFreqDemand, double cpuFreqSupplied, long passedTime);
+
+ /**
+ * Calculate the remaining duration of this fragment based on the demanded and supplied cpu
+ *
+ * @param cpuFreqDemand
+ * @param cpuFreqSupplied
+ * @param remainingWork
+ * @return
+ */
+ long getRemainingDuration(double cpuFreqDemand, double cpuFreqSupplied, double remainingWork);
+
+ /**
+ * Calculate how much work is remaining based on the demanded and supplied cpu
+ *
+ * @param cpuFreqDemand
+ * @param duration
+ * @return
+ */
+ double getRemainingWork(double cpuFreqDemand, long duration);
+}
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt
index ad69a3d6..49baaf48 100644
--- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt
+++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt
@@ -24,7 +24,7 @@ package org.opendc.simulator.compute
import kotlinx.coroutines.suspendCancellableCoroutine
import org.opendc.simulator.compute.machine.SimMachine
-import org.opendc.simulator.compute.workload.TraceWorkload
+import org.opendc.simulator.compute.workload.trace.TraceWorkload
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java
index 67540f4e..3f18ac76 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java
@@ -139,16 +139,16 @@ public final class FlowEngine implements Runnable {
* This method should only be invoked while inside an engine cycle.
*/
public void scheduleDelayedInContext(FlowNode ctx) {
- FlowEventQueue timerQueue = this.eventQueue;
- timerQueue.enqueue(ctx);
+ FlowEventQueue eventQueue = this.eventQueue;
+ eventQueue.enqueue(ctx);
}
/**
* Run all the enqueued actions for the specified timestamp (<code>now</code>).
*/
private void doRunEngine(long now) {
- final FlowCycleQueue queue = this.cycleQueue;
- final FlowEventQueue timerQueue = this.eventQueue;
+ final FlowCycleQueue cycleQueue = this.cycleQueue;
+ final FlowEventQueue eventQueue = this.eventQueue;
try {
// Mark the engine as active to prevent concurrent calls to this method
@@ -156,7 +156,7 @@ public final class FlowEngine implements Runnable {
// Execute all scheduled updates at current timestamp
while (true) {
- final FlowNode ctx = timerQueue.poll(now);
+ final FlowNode ctx = eventQueue.poll(now);
if (ctx == null) {
break;
}
@@ -166,7 +166,7 @@ public final class FlowEngine implements Runnable {
// Execute all immediate updates
while (true) {
- final FlowNode ctx = queue.poll();
+ final FlowNode ctx = cycleQueue.poll();
if (ctx == null) {
break;
}
@@ -178,7 +178,7 @@ public final class FlowEngine implements Runnable {
}
// Schedule an engine invocation for the next update to occur.
- long headDeadline = timerQueue.peekDeadline();
+ long headDeadline = eventQueue.peekDeadline();
if (headDeadline != Long.MAX_VALUE && headDeadline >= now) {
trySchedule(futureInvocations, now, headDeadline);
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
index 16bb161f..ff7ff199 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
@@ -24,6 +24,9 @@ package org.opendc.simulator.engine.graph;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsumer {
private final ArrayList<FlowEdge> consumerEdges = new ArrayList<>();
@@ -36,13 +39,13 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
private double currentIncomingSupply; // The current supply provided by the supplier
private boolean outgoingDemandUpdateNeeded = false;
+ private final Set<Integer> updatedDemands =
+ new HashSet<>(); // Array of consumers that updated their demand in this cycle
private boolean overloaded = false;
private double capacity; // What is the max capacity. Can probably be removed
- private final ArrayList<Integer> updatedDemands = new ArrayList<>();
-
public FlowDistributor(FlowGraph graph) {
super(graph);
}
@@ -68,7 +71,9 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
return Long.MAX_VALUE;
}
- this.updateOutgoingSupplies();
+ if (!this.outgoingSupplies.isEmpty()) {
+ this.updateOutgoingSupplies();
+ }
return Long.MAX_VALUE;
}
@@ -100,7 +105,7 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
// provide all consumers with their demand
if (this.overloaded) {
for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
- if (this.outgoingSupplies.get(idx) != this.incomingDemands.get(idx)) {
+ if (!Objects.equals(this.outgoingSupplies.get(idx), this.incomingDemands.get(idx))) {
this.pushOutgoingSupply(this.consumerEdges.get(idx), this.incomingDemands.get(idx));
}
}
@@ -190,23 +195,25 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
this.totalIncomingDemand -= consumerEdge.getDemand();
+ // Remove idx from consumers that updated their demands
+ this.updatedDemands.remove(idx);
+
this.consumerEdges.remove(idx);
this.incomingDemands.remove(idx);
this.outgoingSupplies.remove(idx);
// update the consumer index for all consumerEdges higher than this.
for (int i = idx; i < this.consumerEdges.size(); i++) {
- this.consumerEdges.get(i).setConsumerIndex(i);
+ FlowEdge other = this.consumerEdges.get(i);
+
+ other.setConsumerIndex(other.getConsumerIndex() - 1);
}
- for (int i = 0; i < this.updatedDemands.size(); i++) {
- int j = this.updatedDemands.get(i);
+ for (int idx_other : this.updatedDemands) {
- if (j == idx) {
- this.updatedDemands.remove(idx);
- }
- if (j > idx) {
- this.updatedDemands.set(i, j - 1);
+ if (idx_other > idx) {
+ this.updatedDemands.remove(idx_other);
+ this.updatedDemands.add(idx_other - 1);
}
}
@@ -220,7 +227,9 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
this.capacity = 0;
this.currentIncomingSupply = 0;
- this.invalidate();
+ this.updatedDemands.clear();
+
+ this.closeNode();
}
@Override
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java
index 0e6e137c..91662950 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java
@@ -63,7 +63,7 @@ public class FlowGraph {
// Remove all edges connected to node
final ArrayList<FlowEdge> connectedEdges = nodeToEdge.get(node);
- while (connectedEdges.size() > 0) {
+ while (!connectedEdges.isEmpty()) {
removeEdge(connectedEdges.get(0));
}
@@ -90,7 +90,7 @@ public class FlowGraph {
throw new IllegalArgumentException("The consumer is not a node in this graph");
}
if (!(this.nodes.contains((FlowNode) flowSupplier))) {
- throw new IllegalArgumentException("The consumer is not a node in this graph");
+ throw new IllegalArgumentException("The supplier is not a node in this graph");
}
final FlowEdge flowEdge = new FlowEdge(flowConsumer, flowSupplier);