From f71e07f55a5176c5bd5447cdb3bcfebf2f5f47ee Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Tue, 7 Jan 2025 11:25:48 +0100 Subject: Updated the FlowDistributor (#285) * Updated the FlowDistributor to work in more cases and be more performant. * Removed old FlowDistributor --- .../opendc/simulator/engine/engine/FlowEngine.java | 8 +- .../simulator/engine/engine/FlowEventQueue.java | 206 +++++++++++++++++++++ .../simulator/engine/engine/FlowTimerQueue.java | 206 --------------------- .../simulator/engine/graph/FlowConsumer.java | 4 +- .../simulator/engine/graph/FlowDistributor.java | 160 +++++++++------- .../opendc/simulator/engine/graph/FlowEdge.java | 26 ++- .../opendc/simulator/engine/graph/FlowNode.java | 4 +- .../simulator/engine/graph/FlowSupplier.java | 4 +- 8 files changed, 329 insertions(+), 289 deletions(-) create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEventQueue.java delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java (limited to 'opendc-simulator/opendc-simulator-flow/src/main/java') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java index 24476048..67540f4e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java @@ -44,7 +44,7 @@ public final class FlowEngine implements Runnable { /** * A priority queue containing the {@link FlowNode} updates to be scheduled in the future. */ - private final FlowTimerQueue timerQueue = new FlowTimerQueue(256); + private final FlowEventQueue eventQueue = new FlowEventQueue(256); /** * The stack of engine invocations to occur in the future. @@ -127,7 +127,7 @@ public final class FlowEngine implements Runnable { return; } - long deadline = timerQueue.peekDeadline(); + long deadline = eventQueue.peekDeadline(); if (deadline != Long.MAX_VALUE) { trySchedule(futureInvocations, clock.millis(), deadline); } @@ -139,7 +139,7 @@ public final class FlowEngine implements Runnable { * This method should only be invoked while inside an engine cycle. */ public void scheduleDelayedInContext(FlowNode ctx) { - FlowTimerQueue timerQueue = this.timerQueue; + FlowEventQueue timerQueue = this.eventQueue; timerQueue.enqueue(ctx); } @@ -148,7 +148,7 @@ public final class FlowEngine implements Runnable { */ private void doRunEngine(long now) { final FlowCycleQueue queue = this.cycleQueue; - final FlowTimerQueue timerQueue = this.timerQueue; + final FlowEventQueue timerQueue = this.eventQueue; try { // Mark the engine as active to prevent concurrent calls to this method diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEventQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEventQueue.java new file mode 100644 index 00000000..53649d29 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEventQueue.java @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.engine; + +import java.util.Arrays; +import org.opendc.simulator.engine.graph.FlowNode; + +/** + * A specialized priority queue for future event of {@link FlowNode}s sorted on time. + *

+ * By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation + * being generic. + */ +public final class FlowEventQueue { + /** + * Array representation of binary heap of {@link FlowNode} instances. + */ + private FlowNode[] queue; + + /** + * The number of elements in the priority queue. + */ + private int size = 0; + + /** + * Construct a {@link FlowEventQueue} with the specified initial capacity. + * + * @param initialCapacity The initial capacity of the queue. + */ + public FlowEventQueue(int initialCapacity) { + this.queue = new FlowNode[initialCapacity]; + } + + /** + * Enqueue a timer for the specified context or update the existing timer. + */ + public void enqueue(FlowNode node) { + FlowNode[] es = queue; + int k = node.getTimerIndex(); + + if (node.getDeadline() != Long.MAX_VALUE) { + if (k >= 0) { + update(es, node, k); + } else { + add(es, node); + } + } else if (k >= 0) { + delete(es, k); + } + } + + /** + * Retrieve the head of the queue if its deadline does not exceed now. + * + * @param now The timestamp that the deadline of the head of the queue should not exceed. + * @return The head of the queue if its deadline does not exceed now, otherwise null. + */ + public FlowNode poll(long now) { + if (this.size == 0) { + return null; + } + + final FlowNode[] es = queue; + final FlowNode head = es[0]; + + if (now < head.getDeadline()) { + return null; + } + + int n = size - 1; + this.size = n; + final FlowNode next = es[n]; + es[n] = null; // Clear the last element of the queue + + if (n > 0) { + siftDown(0, next, es, n); + } + + head.setTimerIndex(-1); + return head; + } + + /** + * Find the earliest deadline in the queue. + */ + public long peekDeadline() { + if (this.size > 0) { + return this.queue[0].getDeadline(); + } + + return Long.MAX_VALUE; + } + + /** + * Add a new entry to the queue. + */ + private void add(FlowNode[] es, FlowNode node) { + if (this.size >= es.length) { + // Re-fetch the resized array + es = grow(); + } + + siftUp(this.size, node, es); + + this.size++; + } + + /** + * Update the deadline of an existing entry in the queue. + */ + private void update(FlowNode[] es, FlowNode node, int k) { + if (k > 0) { + int parent = (k - 1) >>> 1; + if (es[parent].getDeadline() > node.getDeadline()) { + siftUp(k, node, es); + return; + } + } + + siftDown(k, node, es, this.size); + } + + /** + * Deadline an entry from the queue. + */ + private void delete(FlowNode[] es, int k) { + int s = --this.size; + if (s == k) { + es[k] = null; // Element is last in the queue + } else { + FlowNode moved = es[s]; + es[s] = null; + + siftDown(k, moved, es, s); + + if (es[k] == moved) { + siftUp(k, moved, es); + } + } + } + + /** + * Increases the capacity of the array. + */ + private FlowNode[] grow() { + FlowNode[] queue = this.queue; + int oldCapacity = queue.length; + int newCapacity = oldCapacity + (oldCapacity >> 1); + + queue = Arrays.copyOf(queue, newCapacity); + this.queue = queue; + return queue; + } + + private static void siftUp(int k, FlowNode key, FlowNode[] es) { + while (k > 0) { + int parent = (k - 1) >>> 1; + FlowNode e = es[parent]; + if (key.getDeadline() >= e.getDeadline()) break; + es[k] = e; + e.setTimerIndex(k); + k = parent; + } + es[k] = key; + key.setTimerIndex(k); + } + + private static void siftDown(int k, FlowNode key, FlowNode[] es, int n) { + int half = n >>> 1; // loop while a non-leaf + while (k < half) { + int child = (k << 1) + 1; // assume left child is least + FlowNode c = es[child]; + int right = child + 1; + if (right < n && c.getDeadline() > es[right].getDeadline()) c = es[child = right]; + + if (key.getDeadline() <= c.getDeadline()) break; + + es[k] = c; + c.setTimerIndex(k); + k = child; + } + + es[k] = key; + key.setTimerIndex(k); + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java deleted file mode 100644 index 049eb40d..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine.engine; - -import java.util.Arrays; -import org.opendc.simulator.engine.graph.FlowNode; - -/** - * A specialized priority queue for timers of {@link FlowNode}s. - *

- * By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation - * being generic. - */ -public final class FlowTimerQueue { - /** - * Array representation of binary heap of {@link FlowNode} instances. - */ - private FlowNode[] queue; - - /** - * The number of elements in the priority queue. - */ - private int size = 0; - - /** - * Construct a {@link FlowTimerQueue} with the specified initial capacity. - * - * @param initialCapacity The initial capacity of the queue. - */ - public FlowTimerQueue(int initialCapacity) { - this.queue = new FlowNode[initialCapacity]; - } - - /** - * Enqueue a timer for the specified context or update the existing timer. - */ - public void enqueue(FlowNode node) { - FlowNode[] es = queue; - int k = node.getTimerIndex(); - - if (node.getDeadline() != Long.MAX_VALUE) { - if (k >= 0) { - update(es, node, k); - } else { - add(es, node); - } - } else if (k >= 0) { - delete(es, k); - } - } - - /** - * Retrieve the head of the queue if its deadline does not exceed now. - * - * @param now The timestamp that the deadline of the head of the queue should not exceed. - * @return The head of the queue if its deadline does not exceed now, otherwise null. - */ - public FlowNode poll(long now) { - if (this.size == 0) { - return null; - } - - final FlowNode[] es = queue; - final FlowNode head = es[0]; - - if (now < head.getDeadline()) { - return null; - } - - int n = size - 1; - this.size = n; - final FlowNode next = es[n]; - es[n] = null; // Clear the last element of the queue - - if (n > 0) { - siftDown(0, next, es, n); - } - - head.setTimerIndex(-1); - return head; - } - - /** - * Find the earliest deadline in the queue. - */ - public long peekDeadline() { - if (this.size > 0) { - return this.queue[0].getDeadline(); - } - - return Long.MAX_VALUE; - } - - /** - * Add a new entry to the queue. - */ - private void add(FlowNode[] es, FlowNode node) { - if (this.size >= es.length) { - // Re-fetch the resized array - es = grow(); - } - - siftUp(this.size, node, es); - - this.size++; - } - - /** - * Update the deadline of an existing entry in the queue. - */ - private void update(FlowNode[] es, FlowNode node, int k) { - if (k > 0) { - int parent = (k - 1) >>> 1; - if (es[parent].getDeadline() > node.getDeadline()) { - siftUp(k, node, es); - return; - } - } - - siftDown(k, node, es, this.size); - } - - /** - * Deadline an entry from the queue. - */ - private void delete(FlowNode[] es, int k) { - int s = --this.size; - if (s == k) { - es[k] = null; // Element is last in the queue - } else { - FlowNode moved = es[s]; - es[s] = null; - - siftDown(k, moved, es, s); - - if (es[k] == moved) { - siftUp(k, moved, es); - } - } - } - - /** - * Increases the capacity of the array. - */ - private FlowNode[] grow() { - FlowNode[] queue = this.queue; - int oldCapacity = queue.length; - int newCapacity = oldCapacity + (oldCapacity >> 1); - - queue = Arrays.copyOf(queue, newCapacity); - this.queue = queue; - return queue; - } - - private static void siftUp(int k, FlowNode key, FlowNode[] es) { - while (k > 0) { - int parent = (k - 1) >>> 1; - FlowNode e = es[parent]; - if (key.getDeadline() >= e.getDeadline()) break; - es[k] = e; - e.setTimerIndex(k); - k = parent; - } - es[k] = key; - key.setTimerIndex(k); - } - - private static void siftDown(int k, FlowNode key, FlowNode[] es, int n) { - int half = n >>> 1; // loop while a non-leaf - while (k < half) { - int child = (k << 1) + 1; // assume left child is least - FlowNode c = es[child]; - int right = child + 1; - if (right < n && c.getDeadline() > es[right].getDeadline()) c = es[child = right]; - - if (key.getDeadline() <= c.getDeadline()) break; - - es[k] = c; - c.setTimerIndex(k); - k = child; - } - - es[k] = key; - key.setTimerIndex(k); - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java index 2130d376..a9da6f5d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java @@ -24,9 +24,9 @@ package org.opendc.simulator.engine.graph; public interface FlowConsumer { - void handleSupply(FlowEdge supplierEdge, double newSupply); + void handleIncomingSupply(FlowEdge supplierEdge, double newSupply); - void pushDemand(FlowEdge supplierEdge, double newDemand); + void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand); void addSupplierEdge(FlowEdge supplierEdge); diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java index 7ef091f8..16bb161f 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java @@ -29,27 +29,30 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu private final ArrayList consumerEdges = new ArrayList<>(); private FlowEdge supplierEdge; - private final ArrayList demands = new ArrayList<>(); // What is demanded by the consumers - private final ArrayList supplies = new ArrayList<>(); // What is supplied to the consumers + private final ArrayList incomingDemands = new ArrayList<>(); // What is demanded by the consumers + private final ArrayList outgoingSupplies = new ArrayList<>(); // What is supplied to the consumers - private double totalDemand; // The total demand of all the consumers - private double totalSupply; // The total supply from the supplier + private double totalIncomingDemand; // The total demand of all the consumers + private double currentIncomingSupply; // The current supply provided by the supplier - private boolean overLoaded = false; - private int currentConsumerIdx = -1; + private boolean outgoingDemandUpdateNeeded = false; - private double capacity; // What is the max capacity + private boolean overloaded = false; + + private double capacity; // What is the max capacity. Can probably be removed + + private final ArrayList updatedDemands = new ArrayList<>(); public FlowDistributor(FlowGraph graph) { super(graph); } - public double getTotalDemand() { - return totalDemand; + public double getTotalIncomingDemand() { + return totalIncomingDemand; } - public double getTotalSupply() { - return totalSupply; + public double getCurrentIncomingSupply() { + return currentIncomingSupply; } public double getCapacity() { @@ -58,38 +61,61 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu public long onUpdate(long now) { + // Check if current supply is different from total demand + if (this.outgoingDemandUpdateNeeded) { + this.updateOutgoingDemand(); + + return Long.MAX_VALUE; + } + + this.updateOutgoingSupplies(); + return Long.MAX_VALUE; } - private void distributeSupply() { - // if supply >= demand -> push supplies to all tasks - if (this.totalSupply > this.totalDemand) { + private void updateOutgoingDemand() { + this.pushOutgoingDemand(this.supplierEdge, this.totalIncomingDemand); - // If this came from a state of overload, provide all consumers with their demand - if (this.overLoaded) { - for (int idx = 0; idx < this.consumerEdges.size(); idx++) { - this.pushSupply(this.consumerEdges.get(idx), this.demands.get(idx)); - } - } + this.outgoingDemandUpdateNeeded = false; - if (this.currentConsumerIdx != -1) { - this.pushSupply( - this.consumerEdges.get(this.currentConsumerIdx), this.demands.get(this.currentConsumerIdx)); - this.currentConsumerIdx = -1; - } + this.invalidate(); + } - this.overLoaded = false; - } + private void updateOutgoingSupplies() { + + // If the demand is higher than the current supply, the system is overloaded. + // The available supply is distributed based on the current distribution function. + if (this.totalIncomingDemand > this.currentIncomingSupply) { + this.overloaded = true; - // if supply < demand -> distribute the supply over all consumers - else { - this.overLoaded = true; - double[] supplies = redistributeSupply(this.demands, this.totalSupply); + double[] supplies = distributeSupply(this.incomingDemands, this.currentIncomingSupply); for (int idx = 0; idx < this.consumerEdges.size(); idx++) { - this.pushSupply(this.consumerEdges.get(idx), supplies[idx]); + this.pushOutgoingSupply(this.consumerEdges.get(idx), supplies[idx]); + } + + } else { + + // If the distributor was overloaded before, but is not anymore: + // provide all consumers with their demand + if (this.overloaded) { + for (int idx = 0; idx < this.consumerEdges.size(); idx++) { + if (this.outgoingSupplies.get(idx) != this.incomingDemands.get(idx)) { + this.pushOutgoingSupply(this.consumerEdges.get(idx), this.incomingDemands.get(idx)); + } + } + this.overloaded = false; + } + + // Update the supplies of the consumers that changed their demand in the current cycle + else { + for (int idx : this.updatedDemands) { + this.pushOutgoingSupply(this.consumerEdges.get(idx), this.incomingDemands.get(idx)); + } } } + + this.updatedDemands.clear(); } private record Demand(int idx, double value) {} @@ -97,10 +123,8 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu /** * Distributed the available supply over the different demands. * The supply is distributed using MaxMin Fairness. - * - * TODO: Move this outside of the Distributor so we can easily add different redistribution methods */ - private static double[] redistributeSupply(ArrayList demands, double totalSupply) { + private static double[] distributeSupply(ArrayList demands, double currentSupply) { int inputSize = demands.size(); final double[] supplies = new double[inputSize]; @@ -116,7 +140,7 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu return i1.compareTo(i2); }); - double availableCapacity = totalSupply; // totalSupply + double availableCapacity = currentSupply; // totalSupply for (int i = 0; i < inputSize; i++) { double d = tempDemands[i].value; @@ -133,7 +157,6 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu availableCapacity -= r; } - // Return the used capacity return supplies; } @@ -146,15 +169,15 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu consumerEdge.setConsumerIndex(this.consumerEdges.size()); this.consumerEdges.add(consumerEdge); - this.demands.add(0.0); - this.supplies.add(0.0); + this.incomingDemands.add(0.0); + this.outgoingSupplies.add(0.0); } @Override public void addSupplierEdge(FlowEdge supplierEdge) { this.supplierEdge = supplierEdge; this.capacity = supplierEdge.getCapacity(); - this.totalSupply = 0; + this.currentIncomingSupply = 0; } @Override @@ -165,84 +188,87 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu return; } - this.totalDemand -= consumerEdge.getDemand(); + this.totalIncomingDemand -= consumerEdge.getDemand(); this.consumerEdges.remove(idx); - this.demands.remove(idx); - this.supplies.remove(idx); + this.incomingDemands.remove(idx); + this.outgoingSupplies.remove(idx); // update the consumer index for all consumerEdges higher than this. for (int i = idx; i < this.consumerEdges.size(); i++) { this.consumerEdges.get(i).setConsumerIndex(i); } - this.currentConsumerIdx = -1; + for (int i = 0; i < this.updatedDemands.size(); i++) { + int j = this.updatedDemands.get(i); - if (this.overLoaded) { - this.distributeSupply(); + if (j == idx) { + this.updatedDemands.remove(idx); + } + if (j > idx) { + this.updatedDemands.set(i, j - 1); + } } - this.pushDemand(this.supplierEdge, this.totalDemand); + this.outgoingDemandUpdateNeeded = true; + this.invalidate(); } @Override public void removeSupplierEdge(FlowEdge supplierEdge) { this.supplierEdge = null; this.capacity = 0; - this.totalSupply = 0; + this.currentIncomingSupply = 0; + + this.invalidate(); } @Override - public void handleDemand(FlowEdge consumerEdge, double newDemand) { + public void handleIncomingDemand(FlowEdge consumerEdge, double newDemand) { int idx = consumerEdge.getConsumerIndex(); - this.currentConsumerIdx = idx; - if (idx == -1) { System.out.println("Error (FlowDistributor): Demand pushed by an unknown consumer"); return; } // Update the total demand (This is cheaper than summing over all demands) - double prevDemand = demands.get(idx); + double prevDemand = incomingDemands.get(idx); - demands.set(idx, newDemand); - this.totalDemand += (newDemand - prevDemand); + incomingDemands.set(idx, newDemand); + this.totalIncomingDemand += (newDemand - prevDemand); - if (overLoaded) { - distributeSupply(); - } + this.updatedDemands.add(idx); - // Send new totalDemand to CPU - // TODO: Look at what happens if total demand is not changed (if total demand is higher than totalSupply) - this.pushDemand(this.supplierEdge, this.totalDemand); + this.outgoingDemandUpdateNeeded = true; + this.invalidate(); } @Override - public void handleSupply(FlowEdge supplierEdge, double newSupply) { - this.totalSupply = newSupply; + public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) { + this.currentIncomingSupply = newSupply; - this.distributeSupply(); + this.invalidate(); } @Override - public void pushDemand(FlowEdge supplierEdge, double newDemand) { + public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) { this.supplierEdge.pushDemand(newDemand); } @Override - public void pushSupply(FlowEdge consumerEdge, double newSupply) { + public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) { int idx = consumerEdge.getConsumerIndex(); if (idx == -1) { System.out.println("Error (FlowDistributor): pushing supply to an unknown consumer"); } - if (supplies.get(idx) == newSupply) { + if (outgoingSupplies.get(idx) == newSupply) { return; } - supplies.set(idx, newSupply); + outgoingSupplies.set(idx, newSupply); consumerEdge.pushSupply(newSupply); } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java index b7162508..9521f2ce 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java @@ -108,24 +108,38 @@ public class FlowEdge { /** * Push new demand from the Consumer to the Supplier */ - public void pushDemand(double newDemand) { - if (newDemand == this.demand) { + public void pushDemand(double newDemand, boolean forceThrough) { + if ((newDemand == this.demand) && !forceThrough) { return; } this.demand = newDemand; - this.supplier.handleDemand(this, newDemand); + this.supplier.handleIncomingDemand(this, newDemand); + } + + /** + * Push new demand from the Consumer to the Supplier + */ + public void pushDemand(double newDemand) { + this.pushDemand(newDemand, false); } /** * Push new supply from the Supplier to the Consumer */ - public void pushSupply(double newSupply) { - if (newSupply == this.supply) { + public void pushSupply(double newSupply, boolean forceThrough) { + if ((newSupply == this.supply) && !forceThrough) { return; } this.supply = newSupply; - this.consumer.handleSupply(this, newSupply); + this.consumer.handleIncomingSupply(this, newSupply); + } + + /** + * Push new supply from the Supplier to the Consumer + */ + public void pushSupply(double newSupply) { + this.pushSupply(newSupply, false); } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java index 64cd0d8c..e24e9f93 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java @@ -24,7 +24,7 @@ package org.opendc.simulator.engine.graph; import java.time.InstantSource; import org.opendc.simulator.engine.engine.FlowEngine; -import org.opendc.simulator.engine.engine.FlowTimerQueue; +import org.opendc.simulator.engine.engine.FlowEventQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -109,7 +109,7 @@ public abstract class FlowNode { private long deadline = Long.MAX_VALUE; /** - * The index of the timer in the {@link FlowTimerQueue}. + * The index of the timer in the {@link FlowEventQueue}. */ private int timerIndex = -1; diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java index 84602ee0..da65392b 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java @@ -24,9 +24,9 @@ package org.opendc.simulator.engine.graph; public interface FlowSupplier { - void handleDemand(FlowEdge consumerEdge, double newDemand); + void handleIncomingDemand(FlowEdge consumerEdge, double newDemand); - void pushSupply(FlowEdge consumerEdge, double newSupply); + void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply); void addConsumerEdge(FlowEdge consumerEdge); -- cgit v1.2.3