From 8bbc3de611f9a679b5fb542241d32f887b4fe921 Mon Sep 17 00:00:00 2001 From: Dante Niewenhuis Date: Fri, 6 Dec 2024 15:44:09 +0100 Subject: Renamed Multiplexer to FlowDistributor (#282) * Restructured opendc-simulator-flow. Renamed Multiplexer to FlowDistributor. * spotless applied * Added FlowDistributor topologies back --- .../java/org/opendc/simulator/Multiplexer.java | 247 -------------------- .../org/opendc/simulator/engine/FlowConsumer.java | 34 --- .../java/org/opendc/simulator/engine/FlowEdge.java | 131 ----------- .../org/opendc/simulator/engine/FlowEngine.java | 204 ----------------- .../org/opendc/simulator/engine/FlowGraph.java | 112 ---------- .../java/org/opendc/simulator/engine/FlowNode.java | 191 ---------------- .../org/opendc/simulator/engine/FlowNodeQueue.java | 109 --------- .../org/opendc/simulator/engine/FlowSupplier.java | 36 --- .../opendc/simulator/engine/FlowTimerQueue.java | 205 ----------------- .../opendc/simulator/engine/InvocationStack.java | 94 -------- .../opendc/simulator/engine/engine/FlowEngine.java | 206 +++++++++++++++++ .../simulator/engine/engine/FlowNodeQueue.java | 110 +++++++++ .../simulator/engine/engine/FlowTimerQueue.java | 206 +++++++++++++++++ .../simulator/engine/engine/InvocationStack.java | 94 ++++++++ .../simulator/engine/graph/FlowConsumer.java | 34 +++ .../simulator/engine/graph/FlowDistributor.java | 248 +++++++++++++++++++++ .../opendc/simulator/engine/graph/FlowEdge.java | 131 +++++++++++ .../opendc/simulator/engine/graph/FlowGraph.java | 113 ++++++++++ .../opendc/simulator/engine/graph/FlowNode.java | 230 +++++++++++++++++++ .../simulator/engine/graph/FlowSupplier.java | 36 +++ .../src/test/kotlin/InvocationStackTest.kt | 2 +- 21 files changed, 1409 insertions(+), 1364 deletions(-) delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowConsumer.java delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEngine.java delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowGraph.java delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNode.java delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNodeQueue.java delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowSupplier.java delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowTimerQueue.java delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/InvocationStack.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowNodeQueue.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/InvocationStack.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java deleted file mode 100644 index 48177412..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Copyright (c) 2024 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator; - -import java.util.ArrayList; -import java.util.Arrays; -import org.opendc.simulator.engine.FlowConsumer; -import org.opendc.simulator.engine.FlowEdge; -import org.opendc.simulator.engine.FlowGraph; -import org.opendc.simulator.engine.FlowNode; -import org.opendc.simulator.engine.FlowSupplier; - -public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer { - private final ArrayList consumerEdges = new ArrayList<>(); - private FlowEdge supplierEdge; - - private final ArrayList demands = new ArrayList<>(); // What is demanded by the consumers - private final ArrayList supplies = new ArrayList<>(); // What is supplied to the consumers - - private double totalDemand; // The total demand of all the consumers - private double totalSupply; // The total supply from the supplier - - private boolean overLoaded = false; - private int currentConsumerIdx = -1; - - private double capacity; // What is the max capacity - - public Multiplexer(FlowGraph graph) { - super(graph); - } - - public double getTotalDemand() { - return totalDemand; - } - - public double getTotalSupply() { - return totalSupply; - } - - public double getCapacity() { - return capacity; - } - - public long onUpdate(long now) { - - return Long.MAX_VALUE; - } - - private void distributeSupply() { - // if supply >= demand -> push supplies to all tasks - if (this.totalSupply > this.totalDemand) { - - // If this came from a state of over provisioning, provide all consumers with their demand - if (this.overLoaded) { - for (int idx = 0; idx < this.consumerEdges.size(); idx++) { - this.pushSupply(this.consumerEdges.get(idx), this.demands.get(idx)); - } - } - - if (this.currentConsumerIdx != -1) { - this.pushSupply( - this.consumerEdges.get(this.currentConsumerIdx), this.demands.get(this.currentConsumerIdx)); - this.currentConsumerIdx = -1; - } - - this.overLoaded = false; - } - - // if supply < demand -> distribute the supply over all consumers - else { - this.overLoaded = true; - double[] supplies = redistributeSupply(this.demands, this.totalSupply); - - for (int idx = 0; idx < this.consumerEdges.size(); idx++) { - this.pushSupply(this.consumerEdges.get(idx), supplies[idx]); - } - } - } - - private record Demand(int idx, double value) {} - - private static double[] redistributeSupply(ArrayList demands, double totalSupply) { - int inputSize = demands.size(); - - final double[] supplies = new double[inputSize]; - final Demand[] tempDemands = new Demand[inputSize]; - - for (int i = 0; i < inputSize; i++) { - tempDemands[i] = new Demand(i, demands.get(i)); - } - - Arrays.sort(tempDemands, (o1, o2) -> { - Double i1 = o1.value; - Double i2 = o2.value; - return i1.compareTo(i2); - }); - - double availableCapacity = totalSupply; // totalSupply - - for (int i = 0; i < inputSize; i++) { - double d = tempDemands[i].value; - - if (d == 0.0) { - continue; - } - - double availableShare = availableCapacity / (inputSize - i); - double r = Math.min(d, availableShare); - - int idx = tempDemands[i].idx; - supplies[idx] = r; // Update the rates - availableCapacity -= r; - } - - // Return the used capacity - return supplies; - } - - /** - * Add a new consumer. - * Set its demand and supply to 0.0 - */ - @Override - public void addConsumerEdge(FlowEdge consumerEdge) { - consumerEdge.setConsumerIndex(this.consumerEdges.size()); - - this.consumerEdges.add(consumerEdge); - this.demands.add(0.0); - this.supplies.add(0.0); - } - - @Override - public void addSupplierEdge(FlowEdge supplierEdge) { - this.supplierEdge = supplierEdge; - this.capacity = supplierEdge.getCapacity(); - this.totalSupply = 0; - } - - @Override - public void removeConsumerEdge(FlowEdge consumerEdge) { - int idx = consumerEdge.getConsumerIndex(); - - if (idx == -1) { - return; - } - - this.totalDemand -= consumerEdge.getDemand(); - - this.consumerEdges.remove(idx); - this.demands.remove(idx); - this.supplies.remove(idx); - - // update the consumer index for all consumerEdges higher than this. - for (int i = idx; i < this.consumerEdges.size(); i++) { - this.consumerEdges.get(i).setConsumerIndex(i); - } - - this.currentConsumerIdx = -1; - - if (this.overLoaded) { - this.distributeSupply(); - } - - this.pushDemand(this.supplierEdge, this.totalDemand); - } - - @Override - public void removeSupplierEdge(FlowEdge supplierEdge) { - this.supplierEdge = null; - this.capacity = 0; - this.totalSupply = 0; - } - - @Override - public void handleDemand(FlowEdge consumerEdge, double newDemand) { - int idx = consumerEdge.getConsumerIndex(); - - this.currentConsumerIdx = idx; - - if (idx == -1) { - System.out.println("Error (Multiplexer): Demand pushed by an unknown consumer"); - return; - } - - // Update the total demand (This is cheaper than summing over all demands) - double prevDemand = demands.get(idx); - - demands.set(idx, newDemand); - this.totalDemand += (newDemand - prevDemand); - - if (overLoaded) { - distributeSupply(); - } - - // Send new totalDemand to CPU - // TODO: Look at what happens if total demand is not changed (if total demand is higher than totalSupply) - this.pushDemand(this.supplierEdge, this.totalDemand); - } - - @Override - public void handleSupply(FlowEdge supplierEdge, double newSupply) { - this.totalSupply = newSupply; - - this.distributeSupply(); - } - - @Override - public void pushDemand(FlowEdge supplierEdge, double newDemand) { - this.supplierEdge.pushDemand(newDemand); - } - - @Override - public void pushSupply(FlowEdge consumerEdge, double newSupply) { - int idx = consumerEdge.getConsumerIndex(); - - if (idx == -1) { - System.out.println("Error (Multiplexer): pushing supply to an unknown consumer"); - } - - if (supplies.get(idx) == newSupply) { - return; - } - - supplies.set(idx, newSupply); - consumerEdge.pushSupply(newSupply); - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowConsumer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowConsumer.java deleted file mode 100644 index ddb40794..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowConsumer.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2024 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine; - -public interface FlowConsumer { - - void handleSupply(FlowEdge supplierEdge, double newSupply); - - void pushDemand(FlowEdge supplierEdge, double newDemand); - - void addSupplierEdge(FlowEdge supplierEdge); - - void removeSupplierEdge(FlowEdge supplierEdge); -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java deleted file mode 100644 index 95fe7928..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Copyright (c) 2024 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine; - -/** - * An edge that connects two FlowStages. - * A connection between FlowStages always consist of a FlowStage that demands - * something, and a FlowStage that Delivers something - * For instance, this could be the connection between a workload, and its machine - */ -public class FlowEdge { - private FlowConsumer consumer; - private FlowSupplier supplier; - - private int consumerIndex = -1; - private int supplierIndex = -1; - - private double demand = 0.0; - private double supply = 0.0; - - private double capacity; - - public FlowEdge(FlowConsumer consumer, FlowSupplier supplier) { - if (!(consumer instanceof FlowNode)) { - throw new IllegalArgumentException("Flow consumer is not a FlowNode"); - } - if (!(supplier instanceof FlowNode)) { - throw new IllegalArgumentException("Flow consumer is not a FlowNode"); - } - - this.consumer = consumer; - this.supplier = supplier; - - this.capacity = supplier.getCapacity(); - - this.consumer.addSupplierEdge(this); - this.supplier.addConsumerEdge(this); - } - - public void close() { - if (this.consumer != null) { - this.consumer.removeSupplierEdge(this); - this.consumer = null; - } - - if (this.supplier != null) { - this.supplier.removeConsumerEdge(this); - this.supplier = null; - } - } - - public FlowConsumer getConsumer() { - return consumer; - } - - public FlowSupplier getSupplier() { - return supplier; - } - - public double getCapacity() { - return capacity; - } - - public double getDemand() { - return this.demand; - } - - public double getSupply() { - return this.supply; - } - - public int getConsumerIndex() { - return consumerIndex; - } - - public void setConsumerIndex(int consumerIndex) { - this.consumerIndex = consumerIndex; - } - - public int getSupplierIndex() { - return supplierIndex; - } - - public void setSupplierIndex(int supplierIndex) { - this.supplierIndex = supplierIndex; - } - - /** - * Push new demand from the Consumer to the Supplier - */ - public void pushDemand(double newDemand) { - if (newDemand == this.demand) { - return; - } - - this.demand = newDemand; - this.supplier.handleDemand(this, newDemand); - } - - /** - * Push new supply from the Supplier to the Consumer - */ - public void pushSupply(double newSupply) { - if (newSupply == this.supply) { - return; - } - - this.supply = newSupply; - this.consumer.handleSupply(this, newSupply); - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEngine.java deleted file mode 100644 index 10af7c51..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEngine.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine; - -import java.time.Clock; -import java.time.InstantSource; -import kotlin.coroutines.CoroutineContext; -import org.opendc.common.Dispatcher; - -/** - * A {@link FlowEngine} simulates a generic flow network. - *

- * The engine centralizes the scheduling logic of state updates of flow connections, allowing update propagation - * to happen more efficiently. and overall, reducing the work necessary to transition into a steady state. - */ -public final class FlowEngine implements Runnable { - /** - * The queue of {@link FlowNode} updates that are scheduled for immediate execution. - */ - private final FlowNodeQueue queue = new FlowNodeQueue(256); - - /** - * A priority queue containing the {@link FlowNode} updates to be scheduled in the future. - */ - private final FlowTimerQueue timerQueue = new FlowTimerQueue(256); - - /** - * The stack of engine invocations to occur in the future. - */ - private final InvocationStack futureInvocations = new InvocationStack(256); - - /** - * A flag to indicate that the engine is active. - */ - private boolean active; - - private final Dispatcher dispatcher; - private final InstantSource clock; - - /** - * Create a new {@link FlowEngine} instance using the specified {@link CoroutineContext} and {@link InstantSource}. - */ - public static FlowEngine create(Dispatcher dispatcher) { - return new FlowEngine(dispatcher); - } - - FlowEngine(Dispatcher dispatcher) { - this.dispatcher = dispatcher; - this.clock = dispatcher.getTimeSource(); - } - - /** - * Obtain the (virtual) {@link Clock} driving the simulation. - */ - public InstantSource getClock() { - return clock; - } - - /** - * Return a new {@link FlowGraph} that can be used to build a flow network. - */ - public FlowGraph newGraph() { - return new FlowGraph(this); - } - - /** - * Enqueue the specified {@link FlowNode} to be updated immediately during the active engine cycle. - *

- * This method should be used when the state of a flow context is invalidated/interrupted and needs to be - * re-computed. - */ - void scheduleImmediate(long now, FlowNode ctx) { - scheduleImmediateInContext(ctx); - - // In-case the engine is already running in the call-stack, return immediately. The changes will be picked - // up by the active engine. - if (active) { - return; - } - - trySchedule(futureInvocations, now, now); - } - - /** - * Enqueue the specified {@link FlowNode} to be updated immediately during the active engine cycle. - *

- * This method should be used when the state of a flow context is invalidated/interrupted and needs to be - * re-computed. - *

- * This method should only be invoked while inside an engine cycle. - */ - void scheduleImmediateInContext(FlowNode ctx) { - queue.add(ctx); - } - - /** - * Enqueue the specified {@link FlowNode} to be updated at its updated deadline. - */ - void scheduleDelayed(FlowNode ctx) { - scheduleDelayedInContext(ctx); - - // In-case the engine is already running in the call-stack, return immediately. The changes will be picked - // up by the active engine. - if (active) { - return; - } - - long deadline = timerQueue.peekDeadline(); - if (deadline != Long.MAX_VALUE) { - trySchedule(futureInvocations, clock.millis(), deadline); - } - } - - /** - * Enqueue the specified {@link FlowNode} to be updated at its updated deadline. - *

- * This method should only be invoked while inside an engine cycle. - */ - void scheduleDelayedInContext(FlowNode ctx) { - FlowTimerQueue timerQueue = this.timerQueue; - timerQueue.enqueue(ctx); - } - - /** - * Run all the enqueued actions for the specified timestamp (now). - */ - private void doRunEngine(long now) { - final FlowNodeQueue queue = this.queue; - final FlowTimerQueue timerQueue = this.timerQueue; - - try { - // Mark the engine as active to prevent concurrent calls to this method - active = true; - - // Execute all scheduled updates at current timestamp - while (true) { - final FlowNode ctx = timerQueue.poll(now); - if (ctx == null) { - break; - } - - ctx.update(now); - } - - // Execute all immediate updates - while (true) { - final FlowNode ctx = queue.poll(); - if (ctx == null) { - break; - } - - ctx.update(now); - } - } finally { - active = false; - } - - // Schedule an engine invocation for the next update to occur. - long headDeadline = timerQueue.peekDeadline(); - if (headDeadline != Long.MAX_VALUE && headDeadline >= now) { - trySchedule(futureInvocations, now, headDeadline); - } - } - - @Override - public void run() { - doRunEngine(futureInvocations.poll()); - } - - /** - * Try to schedule an engine invocation at the specified [target]. - * - * @param scheduled The queue of scheduled invocations. - * @param now The current virtual timestamp. - * @param target The virtual timestamp at which the engine invocation should happen. - */ - private void trySchedule(InvocationStack scheduled, long now, long target) { - // Only schedule a new scheduler invocation in case the target is earlier than all other pending - // scheduler invocations - if (scheduled.tryAdd(target)) { - dispatcher.schedule(target - now, this); - } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowGraph.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowGraph.java deleted file mode 100644 index d82b542b..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowGraph.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright (c) 2024 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine; - -import java.util.ArrayList; -import java.util.HashMap; - -public class FlowGraph { - private final FlowEngine engine; - private final ArrayList nodes = new ArrayList<>(); - private final ArrayList edges = new ArrayList<>(); - private final HashMap> nodeToEdge = new HashMap<>(); - - public FlowGraph(FlowEngine engine) { - this.engine = engine; - } - - /** - * Return the {@link FlowEngine} driving the simulation of the graph. - */ - public FlowEngine getEngine() { - return engine; - } - - /** - * Create a new {@link FlowNode} representing a node in the flow network. - */ - public void addNode(FlowNode node) { - if (nodes.contains(node)) { - System.out.println("Node already exists"); - } - nodes.add(node); - nodeToEdge.put(node, new ArrayList<>()); - long now = this.engine.getClock().millis(); - node.invalidate(now); - } - - /** - * Internal method to remove the specified {@link FlowNode} from the graph. - */ - public void removeNode(FlowNode node) { - - // Remove all edges connected to node - final ArrayList connectedEdges = nodeToEdge.get(node); - while (connectedEdges.size() > 0) { - removeEdge(connectedEdges.get(0)); - } - - nodeToEdge.remove(node); - - // remove the node - nodes.remove(node); - } - - /** - * Add an edge between the specified consumer and supplier in this graph. - */ - public void addEdge(FlowConsumer flowConsumer, FlowSupplier flowSupplier) { - // Check if the consumer and supplier are both FlowNodes - if (!(flowConsumer instanceof FlowNode)) { - throw new IllegalArgumentException("Flow consumer is not a FlowNode"); - } - if (!(flowSupplier instanceof FlowNode)) { - throw new IllegalArgumentException("Flow consumer is not a FlowNode"); - } - - // Check of the consumer and supplier are present in this graph - if (!(this.nodes.contains((FlowNode) flowConsumer))) { - throw new IllegalArgumentException("The consumer is not a node in this graph"); - } - if (!(this.nodes.contains((FlowNode) flowSupplier))) { - throw new IllegalArgumentException("The consumer is not a node in this graph"); - } - - final FlowEdge flowEdge = new FlowEdge(flowConsumer, flowSupplier); - - edges.add(flowEdge); - - nodeToEdge.get((FlowNode) flowConsumer).add(flowEdge); - nodeToEdge.get((FlowNode) flowSupplier).add(flowEdge); - } - - public void removeEdge(FlowEdge flowEdge) { - final FlowConsumer consumer = flowEdge.getConsumer(); - final FlowSupplier supplier = flowEdge.getSupplier(); - nodeToEdge.get((FlowNode) consumer).remove(flowEdge); - nodeToEdge.get((FlowNode) supplier).remove(flowEdge); - - edges.remove(flowEdge); - flowEdge.close(); - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNode.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNode.java deleted file mode 100644 index d1faf465..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNode.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine; - -import java.time.InstantSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A {@link FlowNode} represents a node in a {@link FlowGraph}. - */ -public abstract class FlowNode { - private static final Logger LOGGER = LoggerFactory.getLogger(FlowNode.class); - - protected enum NodeState { - PENDING, // Stage is active, but is not running any updates - UPDATING, // Stage is active, and running an update - INVALIDATED, // Stage is deemed invalid, and should run an update - CLOSING, // Stage is being closed, final updates can still be run - CLOSED // Stage is closed and should not run any updates - } - - protected NodeState nodeState = NodeState.PENDING; - - /** - * The deadline of the stage after which an update should run. - */ - long deadline = Long.MAX_VALUE; - - /** - * The index of the timer in the {@link FlowTimerQueue}. - */ - int timerIndex = -1; - - protected InstantSource clock; - protected FlowGraph parentGraph; - protected FlowEngine engine; - - /** - * Construct a new {@link FlowNode} instance. - * - * @param parentGraph The {@link FlowGraph} this stage belongs to. - */ - public FlowNode(FlowGraph parentGraph) { - this.parentGraph = parentGraph; - this.engine = parentGraph.getEngine(); - this.clock = engine.getClock(); - - this.parentGraph.addNode(this); - } - - /** - * Return the {@link FlowGraph} to which this stage belongs. - */ - public FlowGraph getGraph() { - return parentGraph; - } - - /** - * Return the current deadline of the {@link FlowNode}'s timer (in milliseconds after epoch). - */ - public long getDeadline() { - return deadline; - } - - public void setDeadline(long deadline) { - this.deadline = deadline; - } - - public void setTimerIndex(int index) { - this.timerIndex = index; - } - /** - * Invalidate the {@link FlowNode} forcing the stage to update. - * - *

- * This method is similar to {@link #invalidate()}, but allows the user to manually pass the current timestamp to - * prevent having to re-query the clock. This method should not be called during an update. - */ - public void invalidate(long now) { - // If there is already an update running, - // notify the update, that a next update should be run after - if (this.nodeState == NodeState.UPDATING) { - this.nodeState = NodeState.INVALIDATED; - } else { - engine.scheduleImmediate(now, this); - } - } - - /** - * Invalidate the {@link FlowNode} forcing the stage to update. - */ - public void invalidate() { - invalidate(clock.millis()); - } - - /** - * Update the state of the stage. - */ - public void update(long now) { - this.nodeState = NodeState.UPDATING; - - long newDeadline = this.deadline; - - try { - newDeadline = this.onUpdate(now); - } catch (Exception e) { - doFail(e); - } - - // Check whether the stage is marked as closing. - if (this.nodeState == NodeState.INVALIDATED) { - newDeadline = now; - } - if (this.nodeState == NodeState.CLOSING) { - closeNode(); - return; - } - - this.deadline = newDeadline; - - // Update the timer queue with the new deadline - engine.scheduleDelayedInContext(this); - - this.nodeState = NodeState.PENDING; - } - - /** - * This method is invoked when the one of the stage's InPorts or OutPorts is invalidated. - * - * @param now The virtual timestamp in milliseconds after epoch at which the update is occurring. - * @return The next deadline for the stage. - */ - public abstract long onUpdate(long now); - - /** - * This method is invoked when an uncaught exception is caught by the engine. When this happens, the - */ - void doFail(Throwable cause) { - LOGGER.warn("Uncaught exception (closing stage)", cause); - - closeNode(); - } - - /** - * This method is invoked when the {@link FlowNode} exits successfully or due to failure. - */ - public void closeNode() { - if (this.nodeState == NodeState.CLOSED) { - // LOGGER.warn("Flowstage:doClose() => Tried closing a stage that was already closed"); - return; - } - - // If this stage is running an update, notify it that is should close after. - if (this.nodeState == NodeState.UPDATING) { - // LOGGER.warn("Flowstage:doClose() => Tried closing a stage, but update was active"); - this.nodeState = NodeState.CLOSING; - return; - } - - // Mark the stage as closed - this.nodeState = NodeState.CLOSED; - - // Remove stage from parent graph - this.parentGraph.removeNode(this); - - // Remove stage from the timer queue - this.deadline = Long.MAX_VALUE; - this.engine.scheduleDelayedInContext(this); - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNodeQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNodeQueue.java deleted file mode 100644 index 37b3c65b..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNodeQueue.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine; - -import java.util.ArrayDeque; -import java.util.Arrays; - -/** - * A specialized {@link ArrayDeque} implementation that contains the {@link FlowNode}s - * that have been updated during the engine cycle and should converge. - *

- * By using a specialized class, we reduce the overhead caused by type-erasure. - */ -final class FlowNodeQueue { - /** - * The array of elements in the queue. - */ - private FlowNode[] elements; - - private int head = 0; - private int tail = 0; - - public FlowNodeQueue(int initialCapacity) { - elements = new FlowNode[initialCapacity]; - } - - /** - * Add the specified context to the queue. - */ - void add(FlowNode ctx) { - final FlowNode[] es = elements; - int tail = this.tail; - - es[tail] = ctx; - - tail = inc(tail, es.length); - this.tail = tail; - - if (head == tail) { - doubleCapacity(); - } - } - - /** - * Remove a {@link FlowNode} from the queue or null if the queue is empty. - */ - FlowNode poll() { - final FlowNode[] es = elements; - int head = this.head; - FlowNode ctx = es[head]; - - if (ctx != null) { - es[head] = null; - this.head = inc(head, es.length); - } - - return ctx; - } - - /** - * Doubles the capacity of this deque - */ - private void doubleCapacity() { - int oldCapacity = elements.length; - int newCapacity = oldCapacity + (oldCapacity >> 1); - if (newCapacity < 0) { - throw new IllegalStateException("Sorry, deque too big"); - } - - final FlowNode[] es = elements = Arrays.copyOf(elements, newCapacity); - - // Exceptionally, here tail == head needs to be disambiguated - if (tail < head || (tail == head && es[head] != null)) { - // wrap around; slide first leg forward to end of array - int newSpace = newCapacity - oldCapacity; - System.arraycopy(es, head, es, head + newSpace, oldCapacity - head); - for (int i = head, to = (head += newSpace); i < to; i++) es[i] = null; - } - } - - /** - * Circularly increments i, mod modulus. - * Precondition and postcondition: 0 <= i < modulus. - */ - private static int inc(int i, int modulus) { - if (++i >= modulus) i = 0; - return i; - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowSupplier.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowSupplier.java deleted file mode 100644 index 955f4943..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowSupplier.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2024 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine; - -public interface FlowSupplier { - - void handleDemand(FlowEdge consumerEdge, double newDemand); - - void pushSupply(FlowEdge consumerEdge, double newSupply); - - void addConsumerEdge(FlowEdge consumerEdge); - - void removeConsumerEdge(FlowEdge consumerEdge); - - double getCapacity(); -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowTimerQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowTimerQueue.java deleted file mode 100644 index 1e348b10..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowTimerQueue.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine; - -import java.util.Arrays; - -/** - * A specialized priority queue for timers of {@link FlowNode}s. - *

- * By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation - * being generic. - */ -public final class FlowTimerQueue { - /** - * Array representation of binary heap of {@link FlowNode} instances. - */ - private FlowNode[] queue; - - /** - * The number of elements in the priority queue. - */ - private int size = 0; - - /** - * Construct a {@link FlowTimerQueue} with the specified initial capacity. - * - * @param initialCapacity The initial capacity of the queue. - */ - public FlowTimerQueue(int initialCapacity) { - this.queue = new FlowNode[initialCapacity]; - } - - /** - * Enqueue a timer for the specified context or update the existing timer. - */ - public void enqueue(FlowNode node) { - FlowNode[] es = queue; - int k = node.timerIndex; - - if (node.deadline != Long.MAX_VALUE) { - if (k >= 0) { - update(es, node, k); - } else { - add(es, node); - } - } else if (k >= 0) { - delete(es, k); - } - } - - /** - * Retrieve the head of the queue if its deadline does not exceed now. - * - * @param now The timestamp that the deadline of the head of the queue should not exceed. - * @return The head of the queue if its deadline does not exceed now, otherwise null. - */ - public FlowNode poll(long now) { - if (this.size == 0) { - return null; - } - - final FlowNode[] es = queue; - final FlowNode head = es[0]; - - if (now < head.deadline) { - return null; - } - - int n = size - 1; - this.size = n; - final FlowNode next = es[n]; - es[n] = null; // Clear the last element of the queue - - if (n > 0) { - siftDown(0, next, es, n); - } - - head.timerIndex = -1; - return head; - } - - /** - * Find the earliest deadline in the queue. - */ - public long peekDeadline() { - if (this.size > 0) { - return this.queue[0].deadline; - } - - return Long.MAX_VALUE; - } - - /** - * Add a new entry to the queue. - */ - private void add(FlowNode[] es, FlowNode node) { - if (this.size >= es.length) { - // Re-fetch the resized array - es = grow(); - } - - siftUp(this.size, node, es); - - this.size++; - } - - /** - * Update the deadline of an existing entry in the queue. - */ - private void update(FlowNode[] es, FlowNode node, int k) { - if (k > 0) { - int parent = (k - 1) >>> 1; - if (es[parent].deadline > node.deadline) { - siftUp(k, node, es); - return; - } - } - - siftDown(k, node, es, this.size); - } - - /** - * Deadline an entry from the queue. - */ - private void delete(FlowNode[] es, int k) { - int s = --this.size; - if (s == k) { - es[k] = null; // Element is last in the queue - } else { - FlowNode moved = es[s]; - es[s] = null; - - siftDown(k, moved, es, s); - - if (es[k] == moved) { - siftUp(k, moved, es); - } - } - } - - /** - * Increases the capacity of the array. - */ - private FlowNode[] grow() { - FlowNode[] queue = this.queue; - int oldCapacity = queue.length; - int newCapacity = oldCapacity + (oldCapacity >> 1); - - queue = Arrays.copyOf(queue, newCapacity); - this.queue = queue; - return queue; - } - - private static void siftUp(int k, FlowNode key, FlowNode[] es) { - while (k > 0) { - int parent = (k - 1) >>> 1; - FlowNode e = es[parent]; - if (key.deadline >= e.deadline) break; - es[k] = e; - e.timerIndex = k; - k = parent; - } - es[k] = key; - key.timerIndex = k; - } - - private static void siftDown(int k, FlowNode key, FlowNode[] es, int n) { - int half = n >>> 1; // loop while a non-leaf - while (k < half) { - int child = (k << 1) + 1; // assume left child is least - FlowNode c = es[child]; - int right = child + 1; - if (right < n && c.deadline > es[right].deadline) c = es[child = right]; - - if (key.deadline <= c.deadline) break; - - es[k] = c; - c.timerIndex = k; - k = child; - } - - es[k] = key; - key.timerIndex = k; - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/InvocationStack.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/InvocationStack.java deleted file mode 100644 index 15da2f23..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/InvocationStack.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.engine; - -import java.util.Arrays; - -/** - * A specialized monotonic stack implementation for tracking the scheduled engine invocations. - *

- * By using a specialized class, we reduce the overhead caused by type-erasure. - */ -public final class InvocationStack { - /** - * The array of elements in the stack. - */ - private long[] elements; - - private int head = -1; - - public InvocationStack(int initialCapacity) { - this.elements = new long[initialCapacity]; - Arrays.fill(this.elements, Long.MIN_VALUE); - } - - /** - * Try to add the specified invocation to the monotonic stack. - * - * @param invocation The timestamp of the invocation. - * @return true if the invocation was added, false otherwise. - */ - public boolean tryAdd(long invocation) { - final long[] es = this.elements; - int head = this.head; - - if (head < 0 || es[head] > invocation) { - es[head + 1] = invocation; - this.head = head + 1; - - if (head + 2 == es.length) { - doubleCapacity(); - } - - return true; - } - - return false; - } - - /** - * Remove the head invocation from the stack or return {@link Long#MAX_VALUE} if the stack is empty. - */ - public long poll() { - int head = this.head--; - - if (head >= 0) { - return this.elements[head]; - } - - return Long.MAX_VALUE; - } - - /** - * Doubles the capacity of this deque - */ - private void doubleCapacity() { - int oldCapacity = this.elements.length; - int newCapacity = oldCapacity + (oldCapacity >> 1); - if (newCapacity < 0) { - throw new IllegalStateException("Sorry, deque too big"); - } - - this.elements = Arrays.copyOf(this.elements, newCapacity); - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java new file mode 100644 index 00000000..1a068b40 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.engine; + +import java.time.Clock; +import java.time.InstantSource; +import kotlin.coroutines.CoroutineContext; +import org.opendc.common.Dispatcher; +import org.opendc.simulator.engine.graph.FlowGraph; +import org.opendc.simulator.engine.graph.FlowNode; + +/** + * A {@link FlowEngine} simulates a generic flow network. + *

+ * The engine centralizes the scheduling logic of state updates of flow connections, allowing update propagation + * to happen more efficiently. and overall, reducing the work necessary to transition into a steady state. + */ +public final class FlowEngine implements Runnable { + /** + * The queue of {@link FlowNode} updates that are scheduled for immediate execution. + */ + private final FlowNodeQueue queue = new FlowNodeQueue(256); + + /** + * A priority queue containing the {@link FlowNode} updates to be scheduled in the future. + */ + private final FlowTimerQueue timerQueue = new FlowTimerQueue(256); + + /** + * The stack of engine invocations to occur in the future. + */ + private final InvocationStack futureInvocations = new InvocationStack(256); + + /** + * A flag to indicate that the engine is active. + */ + private boolean active; + + private final Dispatcher dispatcher; + private final InstantSource clock; + + /** + * Create a new {@link FlowEngine} instance using the specified {@link CoroutineContext} and {@link InstantSource}. + */ + public static FlowEngine create(Dispatcher dispatcher) { + return new FlowEngine(dispatcher); + } + + FlowEngine(Dispatcher dispatcher) { + this.dispatcher = dispatcher; + this.clock = dispatcher.getTimeSource(); + } + + /** + * Obtain the (virtual) {@link Clock} driving the simulation. + */ + public InstantSource getClock() { + return clock; + } + + /** + * Return a new {@link FlowGraph} that can be used to build a flow network. + */ + public FlowGraph newGraph() { + return new FlowGraph(this); + } + + /** + * Enqueue the specified {@link FlowNode} to be updated immediately during the active engine cycle. + *

+ * This method should be used when the state of a flow context is invalidated/interrupted and needs to be + * re-computed. + */ + public void scheduleImmediate(long now, FlowNode ctx) { + scheduleImmediateInContext(ctx); + + // In-case the engine is already running in the call-stack, return immediately. The changes will be picked + // up by the active engine. + if (active) { + return; + } + + trySchedule(futureInvocations, now, now); + } + + /** + * Enqueue the specified {@link FlowNode} to be updated immediately during the active engine cycle. + *

+ * This method should be used when the state of a flow context is invalidated/interrupted and needs to be + * re-computed. + *

+ * This method should only be invoked while inside an engine cycle. + */ + public void scheduleImmediateInContext(FlowNode ctx) { + queue.add(ctx); + } + + /** + * Enqueue the specified {@link FlowNode} to be updated at its updated deadline. + */ + public void scheduleDelayed(FlowNode ctx) { + scheduleDelayedInContext(ctx); + + // In-case the engine is already running in the call-stack, return immediately. The changes will be picked + // up by the active engine. + if (active) { + return; + } + + long deadline = timerQueue.peekDeadline(); + if (deadline != Long.MAX_VALUE) { + trySchedule(futureInvocations, clock.millis(), deadline); + } + } + + /** + * Enqueue the specified {@link FlowNode} to be updated at its updated deadline. + *

+ * This method should only be invoked while inside an engine cycle. + */ + public void scheduleDelayedInContext(FlowNode ctx) { + FlowTimerQueue timerQueue = this.timerQueue; + timerQueue.enqueue(ctx); + } + + /** + * Run all the enqueued actions for the specified timestamp (now). + */ + private void doRunEngine(long now) { + final FlowNodeQueue queue = this.queue; + final FlowTimerQueue timerQueue = this.timerQueue; + + try { + // Mark the engine as active to prevent concurrent calls to this method + active = true; + + // Execute all scheduled updates at current timestamp + while (true) { + final FlowNode ctx = timerQueue.poll(now); + if (ctx == null) { + break; + } + + ctx.update(now); + } + + // Execute all immediate updates + while (true) { + final FlowNode ctx = queue.poll(); + if (ctx == null) { + break; + } + + ctx.update(now); + } + } finally { + active = false; + } + + // Schedule an engine invocation for the next update to occur. + long headDeadline = timerQueue.peekDeadline(); + if (headDeadline != Long.MAX_VALUE && headDeadline >= now) { + trySchedule(futureInvocations, now, headDeadline); + } + } + + @Override + public void run() { + doRunEngine(futureInvocations.poll()); + } + + /** + * Try to schedule an engine invocation at the specified [target]. + * + * @param scheduled The queue of scheduled invocations. + * @param now The current virtual timestamp. + * @param target The virtual timestamp at which the engine invocation should happen. + */ + private void trySchedule(InvocationStack scheduled, long now, long target) { + // Only schedule a new scheduler invocation in case the target is earlier than all other pending + // scheduler invocations + if (scheduled.tryAdd(target)) { + dispatcher.schedule(target - now, this); + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowNodeQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowNodeQueue.java new file mode 100644 index 00000000..bd622083 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowNodeQueue.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.engine; + +import java.util.ArrayDeque; +import java.util.Arrays; +import org.opendc.simulator.engine.graph.FlowNode; + +/** + * A specialized {@link ArrayDeque} implementation that contains the {@link FlowNode}s + * that have been updated during the engine cycle and should converge. + *

+ * By using a specialized class, we reduce the overhead caused by type-erasure. + */ +final class FlowNodeQueue { + /** + * The array of elements in the queue. + */ + private FlowNode[] elements; + + private int head = 0; + private int tail = 0; + + public FlowNodeQueue(int initialCapacity) { + elements = new FlowNode[initialCapacity]; + } + + /** + * Add the specified context to the queue. + */ + void add(FlowNode ctx) { + final FlowNode[] es = elements; + int tail = this.tail; + + es[tail] = ctx; + + tail = inc(tail, es.length); + this.tail = tail; + + if (head == tail) { + doubleCapacity(); + } + } + + /** + * Remove a {@link FlowNode} from the queue or null if the queue is empty. + */ + FlowNode poll() { + final FlowNode[] es = elements; + int head = this.head; + FlowNode ctx = es[head]; + + if (ctx != null) { + es[head] = null; + this.head = inc(head, es.length); + } + + return ctx; + } + + /** + * Doubles the capacity of this deque + */ + private void doubleCapacity() { + int oldCapacity = elements.length; + int newCapacity = oldCapacity + (oldCapacity >> 1); + if (newCapacity < 0) { + throw new IllegalStateException("Sorry, deque too big"); + } + + final FlowNode[] es = elements = Arrays.copyOf(elements, newCapacity); + + // Exceptionally, here tail == head needs to be disambiguated + if (tail < head || (tail == head && es[head] != null)) { + // wrap around; slide first leg forward to end of array + int newSpace = newCapacity - oldCapacity; + System.arraycopy(es, head, es, head + newSpace, oldCapacity - head); + for (int i = head, to = (head += newSpace); i < to; i++) es[i] = null; + } + } + + /** + * Circularly increments i, mod modulus. + * Precondition and postcondition: 0 <= i < modulus. + */ + private static int inc(int i, int modulus) { + if (++i >= modulus) i = 0; + return i; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java new file mode 100644 index 00000000..049eb40d --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowTimerQueue.java @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.engine; + +import java.util.Arrays; +import org.opendc.simulator.engine.graph.FlowNode; + +/** + * A specialized priority queue for timers of {@link FlowNode}s. + *

+ * By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation + * being generic. + */ +public final class FlowTimerQueue { + /** + * Array representation of binary heap of {@link FlowNode} instances. + */ + private FlowNode[] queue; + + /** + * The number of elements in the priority queue. + */ + private int size = 0; + + /** + * Construct a {@link FlowTimerQueue} with the specified initial capacity. + * + * @param initialCapacity The initial capacity of the queue. + */ + public FlowTimerQueue(int initialCapacity) { + this.queue = new FlowNode[initialCapacity]; + } + + /** + * Enqueue a timer for the specified context or update the existing timer. + */ + public void enqueue(FlowNode node) { + FlowNode[] es = queue; + int k = node.getTimerIndex(); + + if (node.getDeadline() != Long.MAX_VALUE) { + if (k >= 0) { + update(es, node, k); + } else { + add(es, node); + } + } else if (k >= 0) { + delete(es, k); + } + } + + /** + * Retrieve the head of the queue if its deadline does not exceed now. + * + * @param now The timestamp that the deadline of the head of the queue should not exceed. + * @return The head of the queue if its deadline does not exceed now, otherwise null. + */ + public FlowNode poll(long now) { + if (this.size == 0) { + return null; + } + + final FlowNode[] es = queue; + final FlowNode head = es[0]; + + if (now < head.getDeadline()) { + return null; + } + + int n = size - 1; + this.size = n; + final FlowNode next = es[n]; + es[n] = null; // Clear the last element of the queue + + if (n > 0) { + siftDown(0, next, es, n); + } + + head.setTimerIndex(-1); + return head; + } + + /** + * Find the earliest deadline in the queue. + */ + public long peekDeadline() { + if (this.size > 0) { + return this.queue[0].getDeadline(); + } + + return Long.MAX_VALUE; + } + + /** + * Add a new entry to the queue. + */ + private void add(FlowNode[] es, FlowNode node) { + if (this.size >= es.length) { + // Re-fetch the resized array + es = grow(); + } + + siftUp(this.size, node, es); + + this.size++; + } + + /** + * Update the deadline of an existing entry in the queue. + */ + private void update(FlowNode[] es, FlowNode node, int k) { + if (k > 0) { + int parent = (k - 1) >>> 1; + if (es[parent].getDeadline() > node.getDeadline()) { + siftUp(k, node, es); + return; + } + } + + siftDown(k, node, es, this.size); + } + + /** + * Deadline an entry from the queue. + */ + private void delete(FlowNode[] es, int k) { + int s = --this.size; + if (s == k) { + es[k] = null; // Element is last in the queue + } else { + FlowNode moved = es[s]; + es[s] = null; + + siftDown(k, moved, es, s); + + if (es[k] == moved) { + siftUp(k, moved, es); + } + } + } + + /** + * Increases the capacity of the array. + */ + private FlowNode[] grow() { + FlowNode[] queue = this.queue; + int oldCapacity = queue.length; + int newCapacity = oldCapacity + (oldCapacity >> 1); + + queue = Arrays.copyOf(queue, newCapacity); + this.queue = queue; + return queue; + } + + private static void siftUp(int k, FlowNode key, FlowNode[] es) { + while (k > 0) { + int parent = (k - 1) >>> 1; + FlowNode e = es[parent]; + if (key.getDeadline() >= e.getDeadline()) break; + es[k] = e; + e.setTimerIndex(k); + k = parent; + } + es[k] = key; + key.setTimerIndex(k); + } + + private static void siftDown(int k, FlowNode key, FlowNode[] es, int n) { + int half = n >>> 1; // loop while a non-leaf + while (k < half) { + int child = (k << 1) + 1; // assume left child is least + FlowNode c = es[child]; + int right = child + 1; + if (right < n && c.getDeadline() > es[right].getDeadline()) c = es[child = right]; + + if (key.getDeadline() <= c.getDeadline()) break; + + es[k] = c; + c.setTimerIndex(k); + k = child; + } + + es[k] = key; + key.setTimerIndex(k); + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/InvocationStack.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/InvocationStack.java new file mode 100644 index 00000000..5607278c --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/InvocationStack.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.engine; + +import java.util.Arrays; + +/** + * A specialized monotonic stack implementation for tracking the scheduled engine invocations. + *

+ * By using a specialized class, we reduce the overhead caused by type-erasure. + */ +public final class InvocationStack { + /** + * The array of elements in the stack. + */ + private long[] elements; + + private int head = -1; + + public InvocationStack(int initialCapacity) { + this.elements = new long[initialCapacity]; + Arrays.fill(this.elements, Long.MIN_VALUE); + } + + /** + * Try to add the specified invocation to the monotonic stack. + * + * @param invocation The timestamp of the invocation. + * @return true if the invocation was added, false otherwise. + */ + public boolean tryAdd(long invocation) { + final long[] es = this.elements; + int head = this.head; + + if (head < 0 || es[head] > invocation) { + es[head + 1] = invocation; + this.head = head + 1; + + if (head + 2 == es.length) { + doubleCapacity(); + } + + return true; + } + + return false; + } + + /** + * Remove the head invocation from the stack or return {@link Long#MAX_VALUE} if the stack is empty. + */ + public long poll() { + int head = this.head--; + + if (head >= 0) { + return this.elements[head]; + } + + return Long.MAX_VALUE; + } + + /** + * Doubles the capacity of this deque + */ + private void doubleCapacity() { + int oldCapacity = this.elements.length; + int newCapacity = oldCapacity + (oldCapacity >> 1); + if (newCapacity < 0) { + throw new IllegalStateException("Sorry, deque too big"); + } + + this.elements = Arrays.copyOf(this.elements, newCapacity); + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java new file mode 100644 index 00000000..2130d376 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowConsumer.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.graph; + +public interface FlowConsumer { + + void handleSupply(FlowEdge supplierEdge, double newSupply); + + void pushDemand(FlowEdge supplierEdge, double newDemand); + + void addSupplierEdge(FlowEdge supplierEdge); + + void removeSupplierEdge(FlowEdge supplierEdge); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java new file mode 100644 index 00000000..7ef091f8 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java @@ -0,0 +1,248 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.graph; + +import java.util.ArrayList; +import java.util.Arrays; + +public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsumer { + private final ArrayList consumerEdges = new ArrayList<>(); + private FlowEdge supplierEdge; + + private final ArrayList demands = new ArrayList<>(); // What is demanded by the consumers + private final ArrayList supplies = new ArrayList<>(); // What is supplied to the consumers + + private double totalDemand; // The total demand of all the consumers + private double totalSupply; // The total supply from the supplier + + private boolean overLoaded = false; + private int currentConsumerIdx = -1; + + private double capacity; // What is the max capacity + + public FlowDistributor(FlowGraph graph) { + super(graph); + } + + public double getTotalDemand() { + return totalDemand; + } + + public double getTotalSupply() { + return totalSupply; + } + + public double getCapacity() { + return capacity; + } + + public long onUpdate(long now) { + + return Long.MAX_VALUE; + } + + private void distributeSupply() { + // if supply >= demand -> push supplies to all tasks + if (this.totalSupply > this.totalDemand) { + + // If this came from a state of overload, provide all consumers with their demand + if (this.overLoaded) { + for (int idx = 0; idx < this.consumerEdges.size(); idx++) { + this.pushSupply(this.consumerEdges.get(idx), this.demands.get(idx)); + } + } + + if (this.currentConsumerIdx != -1) { + this.pushSupply( + this.consumerEdges.get(this.currentConsumerIdx), this.demands.get(this.currentConsumerIdx)); + this.currentConsumerIdx = -1; + } + + this.overLoaded = false; + } + + // if supply < demand -> distribute the supply over all consumers + else { + this.overLoaded = true; + double[] supplies = redistributeSupply(this.demands, this.totalSupply); + + for (int idx = 0; idx < this.consumerEdges.size(); idx++) { + this.pushSupply(this.consumerEdges.get(idx), supplies[idx]); + } + } + } + + private record Demand(int idx, double value) {} + + /** + * Distributed the available supply over the different demands. + * The supply is distributed using MaxMin Fairness. + * + * TODO: Move this outside of the Distributor so we can easily add different redistribution methods + */ + private static double[] redistributeSupply(ArrayList demands, double totalSupply) { + int inputSize = demands.size(); + + final double[] supplies = new double[inputSize]; + final Demand[] tempDemands = new Demand[inputSize]; + + for (int i = 0; i < inputSize; i++) { + tempDemands[i] = new Demand(i, demands.get(i)); + } + + Arrays.sort(tempDemands, (o1, o2) -> { + Double i1 = o1.value; + Double i2 = o2.value; + return i1.compareTo(i2); + }); + + double availableCapacity = totalSupply; // totalSupply + + for (int i = 0; i < inputSize; i++) { + double d = tempDemands[i].value; + + if (d == 0.0) { + continue; + } + + double availableShare = availableCapacity / (inputSize - i); + double r = Math.min(d, availableShare); + + int idx = tempDemands[i].idx; + supplies[idx] = r; // Update the rates + availableCapacity -= r; + } + + // Return the used capacity + return supplies; + } + + /** + * Add a new consumer. + * Set its demand and supply to 0.0 + */ + @Override + public void addConsumerEdge(FlowEdge consumerEdge) { + consumerEdge.setConsumerIndex(this.consumerEdges.size()); + + this.consumerEdges.add(consumerEdge); + this.demands.add(0.0); + this.supplies.add(0.0); + } + + @Override + public void addSupplierEdge(FlowEdge supplierEdge) { + this.supplierEdge = supplierEdge; + this.capacity = supplierEdge.getCapacity(); + this.totalSupply = 0; + } + + @Override + public void removeConsumerEdge(FlowEdge consumerEdge) { + int idx = consumerEdge.getConsumerIndex(); + + if (idx == -1) { + return; + } + + this.totalDemand -= consumerEdge.getDemand(); + + this.consumerEdges.remove(idx); + this.demands.remove(idx); + this.supplies.remove(idx); + + // update the consumer index for all consumerEdges higher than this. + for (int i = idx; i < this.consumerEdges.size(); i++) { + this.consumerEdges.get(i).setConsumerIndex(i); + } + + this.currentConsumerIdx = -1; + + if (this.overLoaded) { + this.distributeSupply(); + } + + this.pushDemand(this.supplierEdge, this.totalDemand); + } + + @Override + public void removeSupplierEdge(FlowEdge supplierEdge) { + this.supplierEdge = null; + this.capacity = 0; + this.totalSupply = 0; + } + + @Override + public void handleDemand(FlowEdge consumerEdge, double newDemand) { + int idx = consumerEdge.getConsumerIndex(); + + this.currentConsumerIdx = idx; + + if (idx == -1) { + System.out.println("Error (FlowDistributor): Demand pushed by an unknown consumer"); + return; + } + + // Update the total demand (This is cheaper than summing over all demands) + double prevDemand = demands.get(idx); + + demands.set(idx, newDemand); + this.totalDemand += (newDemand - prevDemand); + + if (overLoaded) { + distributeSupply(); + } + + // Send new totalDemand to CPU + // TODO: Look at what happens if total demand is not changed (if total demand is higher than totalSupply) + this.pushDemand(this.supplierEdge, this.totalDemand); + } + + @Override + public void handleSupply(FlowEdge supplierEdge, double newSupply) { + this.totalSupply = newSupply; + + this.distributeSupply(); + } + + @Override + public void pushDemand(FlowEdge supplierEdge, double newDemand) { + this.supplierEdge.pushDemand(newDemand); + } + + @Override + public void pushSupply(FlowEdge consumerEdge, double newSupply) { + int idx = consumerEdge.getConsumerIndex(); + + if (idx == -1) { + System.out.println("Error (FlowDistributor): pushing supply to an unknown consumer"); + } + + if (supplies.get(idx) == newSupply) { + return; + } + + supplies.set(idx, newSupply); + consumerEdge.pushSupply(newSupply); + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java new file mode 100644 index 00000000..b7162508 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowEdge.java @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.graph; + +/** + * An edge that connects two FlowStages. + * A connection between FlowStages always consist of a FlowStage that demands + * something, and a FlowStage that Delivers something + * For instance, this could be the connection between a workload, and its machine + */ +public class FlowEdge { + private FlowConsumer consumer; + private FlowSupplier supplier; + + private int consumerIndex = -1; + private int supplierIndex = -1; + + private double demand = 0.0; + private double supply = 0.0; + + private double capacity; + + public FlowEdge(FlowConsumer consumer, FlowSupplier supplier) { + if (!(consumer instanceof FlowNode)) { + throw new IllegalArgumentException("Flow consumer is not a FlowNode"); + } + if (!(supplier instanceof FlowNode)) { + throw new IllegalArgumentException("Flow consumer is not a FlowNode"); + } + + this.consumer = consumer; + this.supplier = supplier; + + this.capacity = supplier.getCapacity(); + + this.consumer.addSupplierEdge(this); + this.supplier.addConsumerEdge(this); + } + + public void close() { + if (this.consumer != null) { + this.consumer.removeSupplierEdge(this); + this.consumer = null; + } + + if (this.supplier != null) { + this.supplier.removeConsumerEdge(this); + this.supplier = null; + } + } + + public FlowConsumer getConsumer() { + return consumer; + } + + public FlowSupplier getSupplier() { + return supplier; + } + + public double getCapacity() { + return capacity; + } + + public double getDemand() { + return this.demand; + } + + public double getSupply() { + return this.supply; + } + + public int getConsumerIndex() { + return consumerIndex; + } + + public void setConsumerIndex(int consumerIndex) { + this.consumerIndex = consumerIndex; + } + + public int getSupplierIndex() { + return supplierIndex; + } + + public void setSupplierIndex(int supplierIndex) { + this.supplierIndex = supplierIndex; + } + + /** + * Push new demand from the Consumer to the Supplier + */ + public void pushDemand(double newDemand) { + if (newDemand == this.demand) { + return; + } + + this.demand = newDemand; + this.supplier.handleDemand(this, newDemand); + } + + /** + * Push new supply from the Supplier to the Consumer + */ + public void pushSupply(double newSupply) { + if (newSupply == this.supply) { + return; + } + + this.supply = newSupply; + this.consumer.handleSupply(this, newSupply); + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java new file mode 100644 index 00000000..0e6e137c --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.graph; + +import java.util.ArrayList; +import java.util.HashMap; +import org.opendc.simulator.engine.engine.FlowEngine; + +public class FlowGraph { + private final FlowEngine engine; + private final ArrayList nodes = new ArrayList<>(); + private final ArrayList edges = new ArrayList<>(); + private final HashMap> nodeToEdge = new HashMap<>(); + + public FlowGraph(FlowEngine engine) { + this.engine = engine; + } + + /** + * Return the {@link FlowEngine} driving the simulation of the graph. + */ + public FlowEngine getEngine() { + return engine; + } + + /** + * Create a new {@link FlowNode} representing a node in the flow network. + */ + public void addNode(FlowNode node) { + if (nodes.contains(node)) { + System.out.println("Node already exists"); + } + nodes.add(node); + nodeToEdge.put(node, new ArrayList<>()); + long now = this.engine.getClock().millis(); + node.invalidate(now); + } + + /** + * Internal method to remove the specified {@link FlowNode} from the graph. + */ + public void removeNode(FlowNode node) { + + // Remove all edges connected to node + final ArrayList connectedEdges = nodeToEdge.get(node); + while (connectedEdges.size() > 0) { + removeEdge(connectedEdges.get(0)); + } + + nodeToEdge.remove(node); + + // remove the node + nodes.remove(node); + } + + /** + * Add an edge between the specified consumer and supplier in this graph. + */ + public void addEdge(FlowConsumer flowConsumer, FlowSupplier flowSupplier) { + // Check if the consumer and supplier are both FlowNodes + if (!(flowConsumer instanceof FlowNode)) { + throw new IllegalArgumentException("Flow consumer is not a FlowNode"); + } + if (!(flowSupplier instanceof FlowNode)) { + throw new IllegalArgumentException("Flow consumer is not a FlowNode"); + } + + // Check of the consumer and supplier are present in this graph + if (!(this.nodes.contains((FlowNode) flowConsumer))) { + throw new IllegalArgumentException("The consumer is not a node in this graph"); + } + if (!(this.nodes.contains((FlowNode) flowSupplier))) { + throw new IllegalArgumentException("The consumer is not a node in this graph"); + } + + final FlowEdge flowEdge = new FlowEdge(flowConsumer, flowSupplier); + + edges.add(flowEdge); + + nodeToEdge.get((FlowNode) flowConsumer).add(flowEdge); + nodeToEdge.get((FlowNode) flowSupplier).add(flowEdge); + } + + public void removeEdge(FlowEdge flowEdge) { + final FlowConsumer consumer = flowEdge.getConsumer(); + final FlowSupplier supplier = flowEdge.getSupplier(); + nodeToEdge.get((FlowNode) consumer).remove(flowEdge); + nodeToEdge.get((FlowNode) supplier).remove(flowEdge); + + edges.remove(flowEdge); + flowEdge.close(); + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java new file mode 100644 index 00000000..6ee947bc --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowNode.java @@ -0,0 +1,230 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.graph; + +import java.time.InstantSource; +import org.opendc.simulator.engine.engine.FlowEngine; +import org.opendc.simulator.engine.engine.FlowTimerQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link FlowNode} represents a node in a {@link FlowGraph}. + */ +public abstract class FlowNode { + private static final Logger LOGGER = LoggerFactory.getLogger(FlowNode.class); + + protected enum NodeState { + PENDING, // Stage is active, but is not running any updates + UPDATING, // Stage is active, and running an update + INVALIDATED, // Stage is deemed invalid, and should run an update + CLOSING, // Stage is being closed, final updates can still be run + CLOSED // Stage is closed and should not run any updates + } + + protected NodeState nodeState = NodeState.PENDING; + + public NodeState getNodeState() { + return nodeState; + } + + public void setNodeState(NodeState nodeState) { + this.nodeState = nodeState; + } + + public int getTimerIndex() { + return timerIndex; + } + + public void setTimerIndex(int index) { + this.timerIndex = index; + } + + public InstantSource getClock() { + return clock; + } + + public void setClock(InstantSource clock) { + this.clock = clock; + } + + public FlowGraph getParentGraph() { + return parentGraph; + } + + public void setParentGraph(FlowGraph parentGraph) { + this.parentGraph = parentGraph; + } + + public FlowEngine getEngine() { + return engine; + } + + public void setEngine(FlowEngine engine) { + this.engine = engine; + } + + /** + * Return the current deadline of the {@link FlowNode}'s timer (in milliseconds after epoch). + */ + public long getDeadline() { + return deadline; + } + + public void setDeadline(long deadline) { + this.deadline = deadline; + } + + /** + * The deadline of the stage after which an update should run. + */ + private long deadline = Long.MAX_VALUE; + + /** + * The index of the timer in the {@link FlowTimerQueue}. + */ + private int timerIndex = -1; + + protected InstantSource clock; + protected FlowGraph parentGraph; + protected FlowEngine engine; + + /** + * Return the {@link FlowGraph} to which this stage belongs. + */ + public FlowGraph getGraph() { + return parentGraph; + } + + /** + * Construct a new {@link FlowNode} instance. + * + * @param parentGraph The {@link FlowGraph} this stage belongs to. + */ + public FlowNode(FlowGraph parentGraph) { + this.parentGraph = parentGraph; + this.engine = parentGraph.getEngine(); + this.clock = engine.getClock(); + + this.parentGraph.addNode(this); + } + + /** + * Invalidate the {@link FlowNode} forcing the stage to update. + * + *

+ * This method is similar to {@link #invalidate()}, but allows the user to manually pass the current timestamp to + * prevent having to re-query the clock. This method should not be called during an update. + */ + public void invalidate(long now) { + // If there is already an update running, + // notify the update, that a next update should be run after + if (this.nodeState == NodeState.UPDATING) { + this.nodeState = NodeState.INVALIDATED; + } else { + engine.scheduleImmediate(now, this); + } + } + + /** + * Invalidate the {@link FlowNode} forcing the stage to update. + */ + public void invalidate() { + invalidate(clock.millis()); + } + + /** + * Update the state of the stage. + */ + public void update(long now) { + this.nodeState = NodeState.UPDATING; + + long newDeadline = this.deadline; + + try { + newDeadline = this.onUpdate(now); + } catch (Exception e) { + doFail(e); + } + + // Check whether the stage is marked as closing. + if (this.nodeState == NodeState.INVALIDATED) { + newDeadline = now; + } + if (this.nodeState == NodeState.CLOSING) { + closeNode(); + return; + } + + this.deadline = newDeadline; + + // Update the timer queue with the new deadline + engine.scheduleDelayedInContext(this); + + this.nodeState = NodeState.PENDING; + } + + /** + * This method is invoked when the one of the stage's InPorts or OutPorts is invalidated. + * + * @param now The virtual timestamp in milliseconds after epoch at which the update is occurring. + * @return The next deadline for the stage. + */ + public abstract long onUpdate(long now); + + /** + * This method is invoked when an uncaught exception is caught by the engine. When this happens, the + */ + void doFail(Throwable cause) { + LOGGER.warn("Uncaught exception (closing stage)", cause); + + closeNode(); + } + + /** + * This method is invoked when the {@link FlowNode} exits successfully or due to failure. + */ + public void closeNode() { + if (this.nodeState == NodeState.CLOSED) { + // LOGGER.warn("Flowstage:doClose() => Tried closing a stage that was already closed"); + return; + } + + // If this stage is running an update, notify it that is should close after. + if (this.nodeState == NodeState.UPDATING) { + // LOGGER.warn("Flowstage:doClose() => Tried closing a stage, but update was active"); + this.nodeState = NodeState.CLOSING; + return; + } + + // Mark the stage as closed + this.nodeState = NodeState.CLOSED; + + // Remove stage from parent graph + this.parentGraph.removeNode(this); + + // Remove stage from the timer queue + this.deadline = Long.MAX_VALUE; + this.engine.scheduleDelayedInContext(this); + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java new file mode 100644 index 00000000..84602ee0 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowSupplier.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine.graph; + +public interface FlowSupplier { + + void handleDemand(FlowEdge consumerEdge, double newDemand); + + void pushSupply(FlowEdge consumerEdge, double newSupply); + + void addConsumerEdge(FlowEdge consumerEdge); + + void removeConsumerEdge(FlowEdge consumerEdge); + + double getCapacity(); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/InvocationStackTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/InvocationStackTest.kt index 7744d7b2..4dd17dbe 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/InvocationStackTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/InvocationStackTest.kt @@ -24,7 +24,7 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertFalse import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test -import org.opendc.simulator.engine.InvocationStack +import org.opendc.simulator.engine.engine.InvocationStack /** * Test suite for the [InvocationStack] class. -- cgit v1.2.3