summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src/main/java
diff options
context:
space:
mode:
authorDante Niewenhuis <d.niewenhuis@hotmail.com>2025-01-24 13:54:59 +0100
committerGitHub <noreply@github.com>2025-01-24 13:54:59 +0100
commitbe9698483f8e7891b5c2d562eaeac9dd3edbf9d8 (patch)
tree60b27e2ff80f76c5aa7736ca64f2ae0580348930 /opendc-simulator/opendc-simulator-flow/src/main/java
parentbb945c2fdd7b20898e3dfccbac7da2a427418216 (diff)
Added Fragment scaling (#296)
* Added maxCpuDemand to TraceWorkload, don't know if this will be needed so might remove later. Updated SimTraceWorkload to properly handle creating checkpoints Fixed a bug with the updatedConsumers in the FlowDistributor Implemented a first version of scaling the runtime of fragments. * small update * updated tests to reflect the changes in the checkpointing model * Updated the checkpointing tests to reflect the changes made * updated wrapper-validation-action * Applied spotless
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src/main/java')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java14
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java35
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java4
3 files changed, 31 insertions, 22 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java
index 67540f4e..3f18ac76 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java
@@ -139,16 +139,16 @@ public final class FlowEngine implements Runnable {
* This method should only be invoked while inside an engine cycle.
*/
public void scheduleDelayedInContext(FlowNode ctx) {
- FlowEventQueue timerQueue = this.eventQueue;
- timerQueue.enqueue(ctx);
+ FlowEventQueue eventQueue = this.eventQueue;
+ eventQueue.enqueue(ctx);
}
/**
* Run all the enqueued actions for the specified timestamp (<code>now</code>).
*/
private void doRunEngine(long now) {
- final FlowCycleQueue queue = this.cycleQueue;
- final FlowEventQueue timerQueue = this.eventQueue;
+ final FlowCycleQueue cycleQueue = this.cycleQueue;
+ final FlowEventQueue eventQueue = this.eventQueue;
try {
// Mark the engine as active to prevent concurrent calls to this method
@@ -156,7 +156,7 @@ public final class FlowEngine implements Runnable {
// Execute all scheduled updates at current timestamp
while (true) {
- final FlowNode ctx = timerQueue.poll(now);
+ final FlowNode ctx = eventQueue.poll(now);
if (ctx == null) {
break;
}
@@ -166,7 +166,7 @@ public final class FlowEngine implements Runnable {
// Execute all immediate updates
while (true) {
- final FlowNode ctx = queue.poll();
+ final FlowNode ctx = cycleQueue.poll();
if (ctx == null) {
break;
}
@@ -178,7 +178,7 @@ public final class FlowEngine implements Runnable {
}
// Schedule an engine invocation for the next update to occur.
- long headDeadline = timerQueue.peekDeadline();
+ long headDeadline = eventQueue.peekDeadline();
if (headDeadline != Long.MAX_VALUE && headDeadline >= now) {
trySchedule(futureInvocations, now, headDeadline);
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
index 16bb161f..ff7ff199 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java
@@ -24,6 +24,9 @@ package org.opendc.simulator.engine.graph;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsumer {
private final ArrayList<FlowEdge> consumerEdges = new ArrayList<>();
@@ -36,13 +39,13 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
private double currentIncomingSupply; // The current supply provided by the supplier
private boolean outgoingDemandUpdateNeeded = false;
+ private final Set<Integer> updatedDemands =
+ new HashSet<>(); // Array of consumers that updated their demand in this cycle
private boolean overloaded = false;
private double capacity; // What is the max capacity. Can probably be removed
- private final ArrayList<Integer> updatedDemands = new ArrayList<>();
-
public FlowDistributor(FlowGraph graph) {
super(graph);
}
@@ -68,7 +71,9 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
return Long.MAX_VALUE;
}
- this.updateOutgoingSupplies();
+ if (!this.outgoingSupplies.isEmpty()) {
+ this.updateOutgoingSupplies();
+ }
return Long.MAX_VALUE;
}
@@ -100,7 +105,7 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
// provide all consumers with their demand
if (this.overloaded) {
for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
- if (this.outgoingSupplies.get(idx) != this.incomingDemands.get(idx)) {
+ if (!Objects.equals(this.outgoingSupplies.get(idx), this.incomingDemands.get(idx))) {
this.pushOutgoingSupply(this.consumerEdges.get(idx), this.incomingDemands.get(idx));
}
}
@@ -190,23 +195,25 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
this.totalIncomingDemand -= consumerEdge.getDemand();
+ // Remove idx from consumers that updated their demands
+ this.updatedDemands.remove(idx);
+
this.consumerEdges.remove(idx);
this.incomingDemands.remove(idx);
this.outgoingSupplies.remove(idx);
// update the consumer index for all consumerEdges higher than this.
for (int i = idx; i < this.consumerEdges.size(); i++) {
- this.consumerEdges.get(i).setConsumerIndex(i);
+ FlowEdge other = this.consumerEdges.get(i);
+
+ other.setConsumerIndex(other.getConsumerIndex() - 1);
}
- for (int i = 0; i < this.updatedDemands.size(); i++) {
- int j = this.updatedDemands.get(i);
+ for (int idx_other : this.updatedDemands) {
- if (j == idx) {
- this.updatedDemands.remove(idx);
- }
- if (j > idx) {
- this.updatedDemands.set(i, j - 1);
+ if (idx_other > idx) {
+ this.updatedDemands.remove(idx_other);
+ this.updatedDemands.add(idx_other - 1);
}
}
@@ -220,7 +227,9 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu
this.capacity = 0;
this.currentIncomingSupply = 0;
- this.invalidate();
+ this.updatedDemands.clear();
+
+ this.closeNode();
}
@Override
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java
index 0e6e137c..91662950 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java
@@ -63,7 +63,7 @@ public class FlowGraph {
// Remove all edges connected to node
final ArrayList<FlowEdge> connectedEdges = nodeToEdge.get(node);
- while (connectedEdges.size() > 0) {
+ while (!connectedEdges.isEmpty()) {
removeEdge(connectedEdges.get(0));
}
@@ -90,7 +90,7 @@ public class FlowGraph {
throw new IllegalArgumentException("The consumer is not a node in this graph");
}
if (!(this.nodes.contains((FlowNode) flowSupplier))) {
- throw new IllegalArgumentException("The consumer is not a node in this graph");
+ throw new IllegalArgumentException("The supplier is not a node in this graph");
}
final FlowEdge flowEdge = new FlowEdge(flowConsumer, flowSupplier);