diff options
| author | Dante Niewenhuis <d.niewenhuis@hotmail.com> | 2025-01-24 13:54:59 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-01-24 13:54:59 +0100 |
| commit | be9698483f8e7891b5c2d562eaeac9dd3edbf9d8 (patch) | |
| tree | 60b27e2ff80f76c5aa7736ca64f2ae0580348930 /opendc-simulator/opendc-simulator-flow/src/main/java | |
| parent | bb945c2fdd7b20898e3dfccbac7da2a427418216 (diff) | |
Added Fragment scaling (#296)
* Added maxCpuDemand to TraceWorkload, don't know if this will be needed so might remove later.
Updated SimTraceWorkload to properly handle creating checkpoints
Fixed a bug with the updatedConsumers in the FlowDistributor
Implemented a first version of scaling the runtime of fragments.
* small update
* updated tests to reflect the changes in the checkpointing model
* Updated the checkpointing tests to reflect the changes made
* updated wrapper-validation-action
* Applied spotless
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src/main/java')
3 files changed, 31 insertions, 22 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java index 67540f4e..3f18ac76 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java @@ -139,16 +139,16 @@ public final class FlowEngine implements Runnable { * This method should only be invoked while inside an engine cycle. */ public void scheduleDelayedInContext(FlowNode ctx) { - FlowEventQueue timerQueue = this.eventQueue; - timerQueue.enqueue(ctx); + FlowEventQueue eventQueue = this.eventQueue; + eventQueue.enqueue(ctx); } /** * Run all the enqueued actions for the specified timestamp (<code>now</code>). */ private void doRunEngine(long now) { - final FlowCycleQueue queue = this.cycleQueue; - final FlowEventQueue timerQueue = this.eventQueue; + final FlowCycleQueue cycleQueue = this.cycleQueue; + final FlowEventQueue eventQueue = this.eventQueue; try { // Mark the engine as active to prevent concurrent calls to this method @@ -156,7 +156,7 @@ public final class FlowEngine implements Runnable { // Execute all scheduled updates at current timestamp while (true) { - final FlowNode ctx = timerQueue.poll(now); + final FlowNode ctx = eventQueue.poll(now); if (ctx == null) { break; } @@ -166,7 +166,7 @@ public final class FlowEngine implements Runnable { // Execute all immediate updates while (true) { - final FlowNode ctx = queue.poll(); + final FlowNode ctx = cycleQueue.poll(); if (ctx == null) { break; } @@ -178,7 +178,7 @@ public final class FlowEngine implements Runnable { } // Schedule an engine invocation for the next update to occur. - long headDeadline = timerQueue.peekDeadline(); + long headDeadline = eventQueue.peekDeadline(); if (headDeadline != Long.MAX_VALUE && headDeadline >= now) { trySchedule(futureInvocations, now, headDeadline); } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java index 16bb161f..ff7ff199 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java @@ -24,6 +24,9 @@ package org.opendc.simulator.engine.graph; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsumer { private final ArrayList<FlowEdge> consumerEdges = new ArrayList<>(); @@ -36,13 +39,13 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu private double currentIncomingSupply; // The current supply provided by the supplier private boolean outgoingDemandUpdateNeeded = false; + private final Set<Integer> updatedDemands = + new HashSet<>(); // Array of consumers that updated their demand in this cycle private boolean overloaded = false; private double capacity; // What is the max capacity. Can probably be removed - private final ArrayList<Integer> updatedDemands = new ArrayList<>(); - public FlowDistributor(FlowGraph graph) { super(graph); } @@ -68,7 +71,9 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu return Long.MAX_VALUE; } - this.updateOutgoingSupplies(); + if (!this.outgoingSupplies.isEmpty()) { + this.updateOutgoingSupplies(); + } return Long.MAX_VALUE; } @@ -100,7 +105,7 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu // provide all consumers with their demand if (this.overloaded) { for (int idx = 0; idx < this.consumerEdges.size(); idx++) { - if (this.outgoingSupplies.get(idx) != this.incomingDemands.get(idx)) { + if (!Objects.equals(this.outgoingSupplies.get(idx), this.incomingDemands.get(idx))) { this.pushOutgoingSupply(this.consumerEdges.get(idx), this.incomingDemands.get(idx)); } } @@ -190,23 +195,25 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu this.totalIncomingDemand -= consumerEdge.getDemand(); + // Remove idx from consumers that updated their demands + this.updatedDemands.remove(idx); + this.consumerEdges.remove(idx); this.incomingDemands.remove(idx); this.outgoingSupplies.remove(idx); // update the consumer index for all consumerEdges higher than this. for (int i = idx; i < this.consumerEdges.size(); i++) { - this.consumerEdges.get(i).setConsumerIndex(i); + FlowEdge other = this.consumerEdges.get(i); + + other.setConsumerIndex(other.getConsumerIndex() - 1); } - for (int i = 0; i < this.updatedDemands.size(); i++) { - int j = this.updatedDemands.get(i); + for (int idx_other : this.updatedDemands) { - if (j == idx) { - this.updatedDemands.remove(idx); - } - if (j > idx) { - this.updatedDemands.set(i, j - 1); + if (idx_other > idx) { + this.updatedDemands.remove(idx_other); + this.updatedDemands.add(idx_other - 1); } } @@ -220,7 +227,9 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu this.capacity = 0; this.currentIncomingSupply = 0; - this.invalidate(); + this.updatedDemands.clear(); + + this.closeNode(); } @Override diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java index 0e6e137c..91662950 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java @@ -63,7 +63,7 @@ public class FlowGraph { // Remove all edges connected to node final ArrayList<FlowEdge> connectedEdges = nodeToEdge.get(node); - while (connectedEdges.size() > 0) { + while (!connectedEdges.isEmpty()) { removeEdge(connectedEdges.get(0)); } @@ -90,7 +90,7 @@ public class FlowGraph { throw new IllegalArgumentException("The consumer is not a node in this graph"); } if (!(this.nodes.contains((FlowNode) flowSupplier))) { - throw new IllegalArgumentException("The consumer is not a node in this graph"); + throw new IllegalArgumentException("The supplier is not a node in this graph"); } final FlowEdge flowEdge = new FlowEdge(flowConsumer, flowSupplier); |
