diff options
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src/main/java')
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java | 8 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEventQueue.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java) | 8 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java | 4 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java | 160 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java | 26 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java | 4 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java | 4 |
7 files changed, 127 insertions, 87 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java index 24476048..67540f4e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java @@ -44,7 +44,7 @@ public final class FlowEngine implements Runnable { /** * A priority queue containing the {@link FlowNode} updates to be scheduled in the future. */ - private final FlowTimerQueue timerQueue = new FlowTimerQueue(256); + private final FlowEventQueue eventQueue = new FlowEventQueue(256); /** * The stack of engine invocations to occur in the future. @@ -127,7 +127,7 @@ public final class FlowEngine implements Runnable { return; } - long deadline = timerQueue.peekDeadline(); + long deadline = eventQueue.peekDeadline(); if (deadline != Long.MAX_VALUE) { trySchedule(futureInvocations, clock.millis(), deadline); } @@ -139,7 +139,7 @@ public final class FlowEngine implements Runnable { * This method should only be invoked while inside an engine cycle. */ public void scheduleDelayedInContext(FlowNode ctx) { - FlowTimerQueue timerQueue = this.timerQueue; + FlowEventQueue timerQueue = this.eventQueue; timerQueue.enqueue(ctx); } @@ -148,7 +148,7 @@ public final class FlowEngine implements Runnable { */ private void doRunEngine(long now) { final FlowCycleQueue queue = this.cycleQueue; - final FlowTimerQueue timerQueue = this.timerQueue; + final FlowEventQueue timerQueue = this.eventQueue; try { // Mark the engine as active to prevent concurrent calls to this method diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEventQueue.java index 049eb40d..53649d29 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEventQueue.java @@ -26,12 +26,12 @@ import java.util.Arrays; import org.opendc.simulator.engine.graph.FlowNode; /** - * A specialized priority queue for timers of {@link FlowNode}s. + * A specialized priority queue for future event of {@link FlowNode}s sorted on time. * <p> * By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation * being generic. */ -public final class FlowTimerQueue { +public final class FlowEventQueue { /** * Array representation of binary heap of {@link FlowNode} instances. */ @@ -43,11 +43,11 @@ public final class FlowTimerQueue { private int size = 0; /** - * Construct a {@link FlowTimerQueue} with the specified initial capacity. + * Construct a {@link FlowEventQueue} with the specified initial capacity. * * @param initialCapacity The initial capacity of the queue. */ - public FlowTimerQueue(int initialCapacity) { + public FlowEventQueue(int initialCapacity) { this.queue = new FlowNode[initialCapacity]; } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java index 2130d376..a9da6f5d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java @@ -24,9 +24,9 @@ package org.opendc.simulator.engine.graph; public interface FlowConsumer { - void handleSupply(FlowEdge supplierEdge, double newSupply); + void handleIncomingSupply(FlowEdge supplierEdge, double newSupply); - void pushDemand(FlowEdge supplierEdge, double newDemand); + void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand); void addSupplierEdge(FlowEdge supplierEdge); diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java index 7ef091f8..16bb161f 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java @@ -29,27 +29,30 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu private final ArrayList<FlowEdge> consumerEdges = new ArrayList<>(); private FlowEdge supplierEdge; - private final ArrayList<Double> demands = new ArrayList<>(); // What is demanded by the consumers - private final ArrayList<Double> supplies = new ArrayList<>(); // What is supplied to the consumers + private final ArrayList<Double> incomingDemands = new ArrayList<>(); // What is demanded by the consumers + private final ArrayList<Double> outgoingSupplies = new ArrayList<>(); // What is supplied to the consumers - private double totalDemand; // The total demand of all the consumers - private double totalSupply; // The total supply from the supplier + private double totalIncomingDemand; // The total demand of all the consumers + private double currentIncomingSupply; // The current supply provided by the supplier - private boolean overLoaded = false; - private int currentConsumerIdx = -1; + private boolean outgoingDemandUpdateNeeded = false; - private double capacity; // What is the max capacity + private boolean overloaded = false; + + private double capacity; // What is the max capacity. Can probably be removed + + private final ArrayList<Integer> updatedDemands = new ArrayList<>(); public FlowDistributor(FlowGraph graph) { super(graph); } - public double getTotalDemand() { - return totalDemand; + public double getTotalIncomingDemand() { + return totalIncomingDemand; } - public double getTotalSupply() { - return totalSupply; + public double getCurrentIncomingSupply() { + return currentIncomingSupply; } public double getCapacity() { @@ -58,38 +61,61 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu public long onUpdate(long now) { + // Check if current supply is different from total demand + if (this.outgoingDemandUpdateNeeded) { + this.updateOutgoingDemand(); + + return Long.MAX_VALUE; + } + + this.updateOutgoingSupplies(); + return Long.MAX_VALUE; } - private void distributeSupply() { - // if supply >= demand -> push supplies to all tasks - if (this.totalSupply > this.totalDemand) { + private void updateOutgoingDemand() { + this.pushOutgoingDemand(this.supplierEdge, this.totalIncomingDemand); - // If this came from a state of overload, provide all consumers with their demand - if (this.overLoaded) { - for (int idx = 0; idx < this.consumerEdges.size(); idx++) { - this.pushSupply(this.consumerEdges.get(idx), this.demands.get(idx)); - } - } + this.outgoingDemandUpdateNeeded = false; - if (this.currentConsumerIdx != -1) { - this.pushSupply( - this.consumerEdges.get(this.currentConsumerIdx), this.demands.get(this.currentConsumerIdx)); - this.currentConsumerIdx = -1; - } + this.invalidate(); + } - this.overLoaded = false; - } + private void updateOutgoingSupplies() { + + // If the demand is higher than the current supply, the system is overloaded. + // The available supply is distributed based on the current distribution function. + if (this.totalIncomingDemand > this.currentIncomingSupply) { + this.overloaded = true; - // if supply < demand -> distribute the supply over all consumers - else { - this.overLoaded = true; - double[] supplies = redistributeSupply(this.demands, this.totalSupply); + double[] supplies = distributeSupply(this.incomingDemands, this.currentIncomingSupply); for (int idx = 0; idx < this.consumerEdges.size(); idx++) { - this.pushSupply(this.consumerEdges.get(idx), supplies[idx]); + this.pushOutgoingSupply(this.consumerEdges.get(idx), supplies[idx]); + } + + } else { + + // If the distributor was overloaded before, but is not anymore: + // provide all consumers with their demand + if (this.overloaded) { + for (int idx = 0; idx < this.consumerEdges.size(); idx++) { + if (this.outgoingSupplies.get(idx) != this.incomingDemands.get(idx)) { + this.pushOutgoingSupply(this.consumerEdges.get(idx), this.incomingDemands.get(idx)); + } + } + this.overloaded = false; + } + + // Update the supplies of the consumers that changed their demand in the current cycle + else { + for (int idx : this.updatedDemands) { + this.pushOutgoingSupply(this.consumerEdges.get(idx), this.incomingDemands.get(idx)); + } } } + + this.updatedDemands.clear(); } private record Demand(int idx, double value) {} @@ -97,10 +123,8 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu /** * Distributed the available supply over the different demands. * The supply is distributed using MaxMin Fairness. - * - * TODO: Move this outside of the Distributor so we can easily add different redistribution methods */ - private static double[] redistributeSupply(ArrayList<Double> demands, double totalSupply) { + private static double[] distributeSupply(ArrayList<Double> demands, double currentSupply) { int inputSize = demands.size(); final double[] supplies = new double[inputSize]; @@ -116,7 +140,7 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu return i1.compareTo(i2); }); - double availableCapacity = totalSupply; // totalSupply + double availableCapacity = currentSupply; // totalSupply for (int i = 0; i < inputSize; i++) { double d = tempDemands[i].value; @@ -133,7 +157,6 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu availableCapacity -= r; } - // Return the used capacity return supplies; } @@ -146,15 +169,15 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu consumerEdge.setConsumerIndex(this.consumerEdges.size()); this.consumerEdges.add(consumerEdge); - this.demands.add(0.0); - this.supplies.add(0.0); + this.incomingDemands.add(0.0); + this.outgoingSupplies.add(0.0); } @Override public void addSupplierEdge(FlowEdge supplierEdge) { this.supplierEdge = supplierEdge; this.capacity = supplierEdge.getCapacity(); - this.totalSupply = 0; + this.currentIncomingSupply = 0; } @Override @@ -165,84 +188,87 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu return; } - this.totalDemand -= consumerEdge.getDemand(); + this.totalIncomingDemand -= consumerEdge.getDemand(); this.consumerEdges.remove(idx); - this.demands.remove(idx); - this.supplies.remove(idx); + this.incomingDemands.remove(idx); + this.outgoingSupplies.remove(idx); // update the consumer index for all consumerEdges higher than this. for (int i = idx; i < this.consumerEdges.size(); i++) { this.consumerEdges.get(i).setConsumerIndex(i); } - this.currentConsumerIdx = -1; + for (int i = 0; i < this.updatedDemands.size(); i++) { + int j = this.updatedDemands.get(i); - if (this.overLoaded) { - this.distributeSupply(); + if (j == idx) { + this.updatedDemands.remove(idx); + } + if (j > idx) { + this.updatedDemands.set(i, j - 1); + } } - this.pushDemand(this.supplierEdge, this.totalDemand); + this.outgoingDemandUpdateNeeded = true; + this.invalidate(); } @Override public void removeSupplierEdge(FlowEdge supplierEdge) { this.supplierEdge = null; this.capacity = 0; - this.totalSupply = 0; + this.currentIncomingSupply = 0; + + this.invalidate(); } @Override - public void handleDemand(FlowEdge consumerEdge, double newDemand) { + public void handleIncomingDemand(FlowEdge consumerEdge, double newDemand) { int idx = consumerEdge.getConsumerIndex(); - this.currentConsumerIdx = idx; - if (idx == -1) { System.out.println("Error (FlowDistributor): Demand pushed by an unknown consumer"); return; } // Update the total demand (This is cheaper than summing over all demands) - double prevDemand = demands.get(idx); + double prevDemand = incomingDemands.get(idx); - demands.set(idx, newDemand); - this.totalDemand += (newDemand - prevDemand); + incomingDemands.set(idx, newDemand); + this.totalIncomingDemand += (newDemand - prevDemand); - if (overLoaded) { - distributeSupply(); - } + this.updatedDemands.add(idx); - // Send new totalDemand to CPU - // TODO: Look at what happens if total demand is not changed (if total demand is higher than totalSupply) - this.pushDemand(this.supplierEdge, this.totalDemand); + this.outgoingDemandUpdateNeeded = true; + this.invalidate(); } @Override - public void handleSupply(FlowEdge supplierEdge, double newSupply) { - this.totalSupply = newSupply; + public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) { + this.currentIncomingSupply = newSupply; - this.distributeSupply(); + this.invalidate(); } @Override - public void pushDemand(FlowEdge supplierEdge, double newDemand) { + public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) { this.supplierEdge.pushDemand(newDemand); } @Override - public void pushSupply(FlowEdge consumerEdge, double newSupply) { + public void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply) { int idx = consumerEdge.getConsumerIndex(); if (idx == -1) { System.out.println("Error (FlowDistributor): pushing supply to an unknown consumer"); } - if (supplies.get(idx) == newSupply) { + if (outgoingSupplies.get(idx) == newSupply) { return; } - supplies.set(idx, newSupply); + outgoingSupplies.set(idx, newSupply); consumerEdge.pushSupply(newSupply); } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java index b7162508..9521f2ce 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java @@ -108,24 +108,38 @@ public class FlowEdge { /** * Push new demand from the Consumer to the Supplier */ - public void pushDemand(double newDemand) { - if (newDemand == this.demand) { + public void pushDemand(double newDemand, boolean forceThrough) { + if ((newDemand == this.demand) && !forceThrough) { return; } this.demand = newDemand; - this.supplier.handleDemand(this, newDemand); + this.supplier.handleIncomingDemand(this, newDemand); + } + + /** + * Push new demand from the Consumer to the Supplier + */ + public void pushDemand(double newDemand) { + this.pushDemand(newDemand, false); } /** * Push new supply from the Supplier to the Consumer */ - public void pushSupply(double newSupply) { - if (newSupply == this.supply) { + public void pushSupply(double newSupply, boolean forceThrough) { + if ((newSupply == this.supply) && !forceThrough) { return; } this.supply = newSupply; - this.consumer.handleSupply(this, newSupply); + this.consumer.handleIncomingSupply(this, newSupply); + } + + /** + * Push new supply from the Supplier to the Consumer + */ + public void pushSupply(double newSupply) { + this.pushSupply(newSupply, false); } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java index 64cd0d8c..e24e9f93 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java @@ -24,7 +24,7 @@ package org.opendc.simulator.engine.graph; import java.time.InstantSource; import org.opendc.simulator.engine.engine.FlowEngine; -import org.opendc.simulator.engine.engine.FlowTimerQueue; +import org.opendc.simulator.engine.engine.FlowEventQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -109,7 +109,7 @@ public abstract class FlowNode { private long deadline = Long.MAX_VALUE; /** - * The index of the timer in the {@link FlowTimerQueue}. + * The index of the timer in the {@link FlowEventQueue}. */ private int timerIndex = -1; diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java index 84602ee0..da65392b 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java @@ -24,9 +24,9 @@ package org.opendc.simulator.engine.graph; public interface FlowSupplier { - void handleDemand(FlowEdge consumerEdge, double newDemand); + void handleIncomingDemand(FlowEdge consumerEdge, double newDemand); - void pushSupply(FlowEdge consumerEdge, double newSupply); + void pushOutgoingSupply(FlowEdge consumerEdge, double newSupply); void addConsumerEdge(FlowEdge consumerEdge); |
