diff options
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc')
34 files changed, 727 insertions, 2935 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java new file mode 100644 index 00000000..0af2499a --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java @@ -0,0 +1,210 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator; + +import java.util.ArrayList; +import java.util.Arrays; +import org.opendc.simulator.engine.FlowConsumer; +import org.opendc.simulator.engine.FlowEdge; +import org.opendc.simulator.engine.FlowGraph; +import org.opendc.simulator.engine.FlowNode; +import org.opendc.simulator.engine.FlowSupplier; + +public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer { + private ArrayList<FlowEdge> consumerEdges = new ArrayList<>(); + private FlowEdge supplierEdge; + + private ArrayList<Float> demands = new ArrayList<>(); // What is demanded by the consumers + private ArrayList<Float> supplies = new ArrayList<>(); // What is supplied to the consumers + + private float totalDemand; // The total demand of all the consumers + private float totalSupply; // The total supply from the supplier + private float capacity; // What is the max capacity + + public Multiplexer(FlowGraph graph) { + super(graph); + } + + public float getTotalDemand() { + return totalDemand; + } + + public float getTotalSupply() { + return totalSupply; + } + + public float getCapacity() { + return capacity; + } + + public long onUpdate(long now) { + + if (this.totalDemand > this.capacity) { + redistributeSupply(this.consumerEdges, this.supplies, this.capacity); + } else { + for (int i = 0; i < this.demands.size(); i++) { + this.supplies.set(i, this.demands.get(i)); + } + } + + float totalSupply = 0; + for (int i = 0; i < this.consumerEdges.size(); i++) { + this.pushSupply(this.consumerEdges.get(i), this.supplies.get(i)); + totalSupply += this.supplies.get(i); + } + + // Only update supplier if supply has changed + if (this.totalSupply != totalSupply) { + this.totalSupply = totalSupply; + + pushDemand(this.supplierEdge, this.totalSupply); + } + + return Long.MAX_VALUE; + } + + private static float redistributeSupply( + ArrayList<FlowEdge> consumerEdges, ArrayList<Float> supplies, float capacity) { + final long[] consumers = new long[consumerEdges.size()]; + + for (int i = 0; i < consumers.length; i++) { + FlowEdge consumer = consumerEdges.get(i); + + if (consumer == null) { + break; + } + + consumers[i] = ((long) Float.floatToRawIntBits(consumer.getDemand()) << 32) | (i & 0xFFFFFFFFL); + } + Arrays.sort(consumers); + + float availableCapacity = capacity; + int inputSize = consumers.length; + + for (int i = 0; i < inputSize; i++) { + long v = consumers[i]; + int slot = (int) v; + float d = Float.intBitsToFloat((int) (v >> 32)); + + if (d == 0.0) { + continue; + } + + float availableShare = availableCapacity / (inputSize - i); + float r = Math.min(d, availableShare); + + supplies.set(slot, r); // Update the rates + availableCapacity -= r; + } + + // Return the used capacity + return capacity - availableCapacity; + } + + /** + * Add a new consumer. + * Set its demand and supply to 0.0 + */ + @Override + public void addConsumerEdge(FlowEdge consumerEdge) { + this.consumerEdges.add(consumerEdge); + this.demands.add(0f); + this.supplies.add(0f); + } + + @Override + public void addSupplierEdge(FlowEdge supplierEdge) { + this.supplierEdge = supplierEdge; + this.capacity = supplierEdge.getCapacity(); + this.totalSupply = 0; + } + + @Override + public void removeConsumerEdge(FlowEdge consumerEdge) { + int idx = this.consumerEdges.indexOf(consumerEdge); + + if (idx == -1) { + return; + } + + this.totalDemand -= consumerEdge.getDemand(); + + this.consumerEdges.remove(idx); + this.demands.remove(idx); + this.supplies.remove(idx); + + this.invalidate(); + } + + @Override + public void removeSupplierEdge(FlowEdge supplierEdge) { + this.supplierEdge = null; + this.capacity = 0; + this.totalSupply = 0; + } + + @Override + public void handleDemand(FlowEdge consumerEdge, float newDemand) { + int idx = consumerEdges.indexOf(consumerEdge); + + if (idx == -1) { + System.out.println("Error (Multiplexer): Demand pushed by an unknown consumer"); + return; + } + + float prevDemand = demands.get(idx); + demands.set(idx, newDemand); + + this.totalDemand += (newDemand - prevDemand); + } + + @Override + public void handleSupply(FlowEdge supplierEdge, float newSupply) { + if (newSupply == this.totalSupply) { + return; + } + + this.totalSupply = newSupply; + } + + @Override + public void pushDemand(FlowEdge supplierEdge, float newDemand) { + this.supplierEdge.pushDemand(newDemand); + } + + @Override + public void pushSupply(FlowEdge consumerEdge, float newSupply) { + int idx = consumerEdges.indexOf(consumerEdge); + + if (idx == -1) { + System.out.println("Error (Multiplexer): pushing supply to an unknown consumer"); + } + + if (newSupply == supplies.get(idx)) { + return; + } + + supplies.set(idx, newSupply); + consumerEdge.pushSupply(newSupply); + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Inlet.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowConsumer.java index 4a9ea6a5..7ba5dea7 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Inlet.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowConsumer.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 AtLarge Research + * Copyright (c) 2024 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,19 +20,15 @@ * SOFTWARE. */ -package org.opendc.simulator.flow2; +package org.opendc.simulator.engine; -/** - * An in-going edge in a {@link FlowGraph}. - */ -public interface Inlet { - /** - * Return the {@link FlowGraph} to which the inlet is exposed. - */ - FlowGraph getGraph(); +public interface FlowConsumer { + + void handleSupply(FlowEdge supplierEdge, float newSupply); + + void pushDemand(FlowEdge supplierEdge, float newDemand); + + void addSupplierEdge(FlowEdge supplierEdge); - /** - * Return the name of the inlet. - */ - String getName(); + void removeSupplierEdge(FlowEdge supplierEdge); } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java new file mode 100644 index 00000000..0edc9e68 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine; + +/** + * An edge that connects two FlowStages. + * A connection between FlowStages always consist of a FlowStage that demands + * something, and a FlowStage that Delivers something + * For instance, this could be the connection between a workload, and its machine + */ +public class FlowEdge { + private FlowConsumer consumer; + private FlowSupplier supplier; + + private float demand = 0.0f; + private float supply = 0.0f; + + private float capacity; + + public FlowEdge(FlowConsumer consumer, FlowSupplier supplier) { + if (!(consumer instanceof FlowNode)) { + throw new IllegalArgumentException("Flow consumer is not a FlowNode"); + } + if (!(supplier instanceof FlowNode)) { + throw new IllegalArgumentException("Flow consumer is not a FlowNode"); + } + + this.consumer = consumer; + this.supplier = supplier; + + this.capacity = supplier.getCapacity(); + + this.consumer.addSupplierEdge(this); + this.supplier.addConsumerEdge(this); + } + + public void close() { + if (this.consumer != null) { + this.consumer.removeSupplierEdge(this); + this.consumer = null; + } + + if (this.supplier != null) { + this.supplier.removeConsumerEdge(this); + this.supplier = null; + } + } + + public FlowConsumer getConsumer() { + return consumer; + } + + public FlowSupplier getSupplier() { + return supplier; + } + + public float getCapacity() { + return capacity; + } + + public float getDemand() { + return this.demand; + } + + public float getSupply() { + return this.supply; + } + + /** + * Push new demand from the Consumer to the Supplier + */ + public void pushDemand(float newDemand) { + if (newDemand == this.demand) { + return; + } + + this.demand = newDemand; + this.supplier.handleDemand(this, newDemand); + ((FlowNode) this.supplier).invalidate(); + } + + /** + * Push new supply from the Supplier to the Consumer + */ + public void pushSupply(float newSupply) { + if (newSupply == this.supply) { + return; + } + + this.supply = newSupply; + this.consumer.handleSupply(this, newSupply); + ((FlowNode) this.consumer).invalidate(); + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEngine.java index c0f52505..10af7c51 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEngine.java @@ -20,12 +20,10 @@ * SOFTWARE. */ -package org.opendc.simulator.flow2; +package org.opendc.simulator.engine; import java.time.Clock; import java.time.InstantSource; -import java.util.ArrayList; -import java.util.List; import kotlin.coroutines.CoroutineContext; import org.opendc.common.Dispatcher; @@ -37,12 +35,12 @@ import org.opendc.common.Dispatcher; */ public final class FlowEngine implements Runnable { /** - * The queue of {@link FlowStage} updates that are scheduled for immediate execution. + * The queue of {@link FlowNode} updates that are scheduled for immediate execution. */ - private final FlowStageQueue queue = new FlowStageQueue(256); + private final FlowNodeQueue queue = new FlowNodeQueue(256); /** - * A priority queue containing the {@link FlowStage} updates to be scheduled in the future. + * A priority queue containing the {@link FlowNode} updates to be scheduled in the future. */ private final FlowTimerQueue timerQueue = new FlowTimerQueue(256); @@ -82,16 +80,16 @@ public final class FlowEngine implements Runnable { * Return a new {@link FlowGraph} that can be used to build a flow network. */ public FlowGraph newGraph() { - return new RootGraph(this); + return new FlowGraph(this); } /** - * Enqueue the specified {@link FlowStage} to be updated immediately during the active engine cycle. + * Enqueue the specified {@link FlowNode} to be updated immediately during the active engine cycle. * <p> * This method should be used when the state of a flow context is invalidated/interrupted and needs to be * re-computed. */ - void scheduleImmediate(long now, FlowStage ctx) { + void scheduleImmediate(long now, FlowNode ctx) { scheduleImmediateInContext(ctx); // In-case the engine is already running in the call-stack, return immediately. The changes will be picked @@ -104,21 +102,21 @@ public final class FlowEngine implements Runnable { } /** - * Enqueue the specified {@link FlowStage} to be updated immediately during the active engine cycle. + * Enqueue the specified {@link FlowNode} to be updated immediately during the active engine cycle. * <p> * This method should be used when the state of a flow context is invalidated/interrupted and needs to be * re-computed. * <p> * This method should only be invoked while inside an engine cycle. */ - void scheduleImmediateInContext(FlowStage ctx) { + void scheduleImmediateInContext(FlowNode ctx) { queue.add(ctx); } /** - * Enqueue the specified {@link FlowStage} to be updated at its updated deadline. + * Enqueue the specified {@link FlowNode} to be updated at its updated deadline. */ - void scheduleDelayed(FlowStage ctx) { + void scheduleDelayed(FlowNode ctx) { scheduleDelayedInContext(ctx); // In-case the engine is already running in the call-stack, return immediately. The changes will be picked @@ -134,11 +132,11 @@ public final class FlowEngine implements Runnable { } /** - * Enqueue the specified {@link FlowStage} to be updated at its updated deadline. + * Enqueue the specified {@link FlowNode} to be updated at its updated deadline. * <p> * This method should only be invoked while inside an engine cycle. */ - void scheduleDelayedInContext(FlowStage ctx) { + void scheduleDelayedInContext(FlowNode ctx) { FlowTimerQueue timerQueue = this.timerQueue; timerQueue.enqueue(ctx); } @@ -147,7 +145,7 @@ public final class FlowEngine implements Runnable { * Run all the enqueued actions for the specified timestamp (<code>now</code>). */ private void doRunEngine(long now) { - final FlowStageQueue queue = this.queue; + final FlowNodeQueue queue = this.queue; final FlowTimerQueue timerQueue = this.timerQueue; try { @@ -156,22 +154,22 @@ public final class FlowEngine implements Runnable { // Execute all scheduled updates at current timestamp while (true) { - final FlowStage ctx = timerQueue.poll(now); + final FlowNode ctx = timerQueue.poll(now); if (ctx == null) { break; } - ctx.onUpdate(now); + ctx.update(now); } // Execute all immediate updates while (true) { - final FlowStage ctx = queue.poll(); + final FlowNode ctx = queue.poll(); if (ctx == null) { break; } - ctx.onUpdate(now); + ctx.update(now); } } finally { active = false; @@ -203,54 +201,4 @@ public final class FlowEngine implements Runnable { dispatcher.schedule(target - now, this); } } - - /** - * Internal implementation of a root {@link FlowGraph}. - */ - private static final class RootGraph implements FlowGraphInternal { - private final FlowEngine engine; - private final List<FlowStage> stages = new ArrayList<>(); - - public RootGraph(FlowEngine engine) { - this.engine = engine; - } - - @Override - public FlowEngine getEngine() { - return engine; - } - - @Override - public FlowStage newStage(FlowStageLogic logic) { - final FlowEngine engine = this.engine; - final FlowStage stage = new FlowStage(this, logic); - stages.add(stage); - long now = engine.getClock().millis(); - stage.invalidate(now); - return stage; - } - - @Override - public void connect(Outlet outlet, Inlet inlet) { - FlowGraphInternal.connect(this, outlet, inlet); - } - - @Override - public void disconnect(Outlet outlet) { - FlowGraphInternal.disconnect(this, outlet); - } - - @Override - public void disconnect(Inlet inlet) { - FlowGraphInternal.disconnect(this, inlet); - } - - /** - * Internal method to remove the specified {@link FlowStage} from the graph. - */ - @Override - public void detach(FlowStage stage) { - stages.remove(stage); - } - } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowGraph.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowGraph.java new file mode 100644 index 00000000..d82b542b --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowGraph.java @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2024 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine; + +import java.util.ArrayList; +import java.util.HashMap; + +public class FlowGraph { + private final FlowEngine engine; + private final ArrayList<FlowNode> nodes = new ArrayList<>(); + private final ArrayList<FlowEdge> edges = new ArrayList<>(); + private final HashMap<FlowNode, ArrayList<FlowEdge>> nodeToEdge = new HashMap<>(); + + public FlowGraph(FlowEngine engine) { + this.engine = engine; + } + + /** + * Return the {@link FlowEngine} driving the simulation of the graph. + */ + public FlowEngine getEngine() { + return engine; + } + + /** + * Create a new {@link FlowNode} representing a node in the flow network. + */ + public void addNode(FlowNode node) { + if (nodes.contains(node)) { + System.out.println("Node already exists"); + } + nodes.add(node); + nodeToEdge.put(node, new ArrayList<>()); + long now = this.engine.getClock().millis(); + node.invalidate(now); + } + + /** + * Internal method to remove the specified {@link FlowNode} from the graph. + */ + public void removeNode(FlowNode node) { + + // Remove all edges connected to node + final ArrayList<FlowEdge> connectedEdges = nodeToEdge.get(node); + while (connectedEdges.size() > 0) { + removeEdge(connectedEdges.get(0)); + } + + nodeToEdge.remove(node); + + // remove the node + nodes.remove(node); + } + + /** + * Add an edge between the specified consumer and supplier in this graph. + */ + public void addEdge(FlowConsumer flowConsumer, FlowSupplier flowSupplier) { + // Check if the consumer and supplier are both FlowNodes + if (!(flowConsumer instanceof FlowNode)) { + throw new IllegalArgumentException("Flow consumer is not a FlowNode"); + } + if (!(flowSupplier instanceof FlowNode)) { + throw new IllegalArgumentException("Flow consumer is not a FlowNode"); + } + + // Check of the consumer and supplier are present in this graph + if (!(this.nodes.contains((FlowNode) flowConsumer))) { + throw new IllegalArgumentException("The consumer is not a node in this graph"); + } + if (!(this.nodes.contains((FlowNode) flowSupplier))) { + throw new IllegalArgumentException("The consumer is not a node in this graph"); + } + + final FlowEdge flowEdge = new FlowEdge(flowConsumer, flowSupplier); + + edges.add(flowEdge); + + nodeToEdge.get((FlowNode) flowConsumer).add(flowEdge); + nodeToEdge.get((FlowNode) flowSupplier).add(flowEdge); + } + + public void removeEdge(FlowEdge flowEdge) { + final FlowConsumer consumer = flowEdge.getConsumer(); + final FlowSupplier supplier = flowEdge.getSupplier(); + nodeToEdge.get((FlowNode) consumer).remove(flowEdge); + nodeToEdge.get((FlowNode) supplier).remove(flowEdge); + + edges.remove(flowEdge); + flowEdge.close(); + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNode.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNode.java new file mode 100644 index 00000000..d1faf465 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNode.java @@ -0,0 +1,191 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.engine; + +import java.time.InstantSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link FlowNode} represents a node in a {@link FlowGraph}. + */ +public abstract class FlowNode { + private static final Logger LOGGER = LoggerFactory.getLogger(FlowNode.class); + + protected enum NodeState { + PENDING, // Stage is active, but is not running any updates + UPDATING, // Stage is active, and running an update + INVALIDATED, // Stage is deemed invalid, and should run an update + CLOSING, // Stage is being closed, final updates can still be run + CLOSED // Stage is closed and should not run any updates + } + + protected NodeState nodeState = NodeState.PENDING; + + /** + * The deadline of the stage after which an update should run. + */ + long deadline = Long.MAX_VALUE; + + /** + * The index of the timer in the {@link FlowTimerQueue}. + */ + int timerIndex = -1; + + protected InstantSource clock; + protected FlowGraph parentGraph; + protected FlowEngine engine; + + /** + * Construct a new {@link FlowNode} instance. + * + * @param parentGraph The {@link FlowGraph} this stage belongs to. + */ + public FlowNode(FlowGraph parentGraph) { + this.parentGraph = parentGraph; + this.engine = parentGraph.getEngine(); + this.clock = engine.getClock(); + + this.parentGraph.addNode(this); + } + + /** + * Return the {@link FlowGraph} to which this stage belongs. + */ + public FlowGraph getGraph() { + return parentGraph; + } + + /** + * Return the current deadline of the {@link FlowNode}'s timer (in milliseconds after epoch). + */ + public long getDeadline() { + return deadline; + } + + public void setDeadline(long deadline) { + this.deadline = deadline; + } + + public void setTimerIndex(int index) { + this.timerIndex = index; + } + /** + * Invalidate the {@link FlowNode} forcing the stage to update. + * + * <p> + * This method is similar to {@link #invalidate()}, but allows the user to manually pass the current timestamp to + * prevent having to re-query the clock. This method should not be called during an update. + */ + public void invalidate(long now) { + // If there is already an update running, + // notify the update, that a next update should be run after + if (this.nodeState == NodeState.UPDATING) { + this.nodeState = NodeState.INVALIDATED; + } else { + engine.scheduleImmediate(now, this); + } + } + + /** + * Invalidate the {@link FlowNode} forcing the stage to update. + */ + public void invalidate() { + invalidate(clock.millis()); + } + + /** + * Update the state of the stage. + */ + public void update(long now) { + this.nodeState = NodeState.UPDATING; + + long newDeadline = this.deadline; + + try { + newDeadline = this.onUpdate(now); + } catch (Exception e) { + doFail(e); + } + + // Check whether the stage is marked as closing. + if (this.nodeState == NodeState.INVALIDATED) { + newDeadline = now; + } + if (this.nodeState == NodeState.CLOSING) { + closeNode(); + return; + } + + this.deadline = newDeadline; + + // Update the timer queue with the new deadline + engine.scheduleDelayedInContext(this); + + this.nodeState = NodeState.PENDING; + } + + /** + * This method is invoked when the one of the stage's InPorts or OutPorts is invalidated. + * + * @param now The virtual timestamp in milliseconds after epoch at which the update is occurring. + * @return The next deadline for the stage. + */ + public abstract long onUpdate(long now); + + /** + * This method is invoked when an uncaught exception is caught by the engine. When this happens, the + */ + void doFail(Throwable cause) { + LOGGER.warn("Uncaught exception (closing stage)", cause); + + closeNode(); + } + + /** + * This method is invoked when the {@link FlowNode} exits successfully or due to failure. + */ + public void closeNode() { + if (this.nodeState == NodeState.CLOSED) { + // LOGGER.warn("Flowstage:doClose() => Tried closing a stage that was already closed"); + return; + } + + // If this stage is running an update, notify it that is should close after. + if (this.nodeState == NodeState.UPDATING) { + // LOGGER.warn("Flowstage:doClose() => Tried closing a stage, but update was active"); + this.nodeState = NodeState.CLOSING; + return; + } + + // Mark the stage as closed + this.nodeState = NodeState.CLOSED; + + // Remove stage from parent graph + this.parentGraph.removeNode(this); + + // Remove stage from the timer queue + this.deadline = Long.MAX_VALUE; + this.engine.scheduleDelayedInContext(this); + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNodeQueue.java index 56ec7702..37b3c65b 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageQueue.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNodeQueue.java @@ -20,35 +20,35 @@ * SOFTWARE. */ -package org.opendc.simulator.flow2; +package org.opendc.simulator.engine; import java.util.ArrayDeque; import java.util.Arrays; /** - * A specialized {@link ArrayDeque} implementation that contains the {@link FlowStageLogic}s + * A specialized {@link ArrayDeque} implementation that contains the {@link FlowNode}s * that have been updated during the engine cycle and should converge. * <p> * By using a specialized class, we reduce the overhead caused by type-erasure. */ -final class FlowStageQueue { +final class FlowNodeQueue { /** * The array of elements in the queue. */ - private FlowStage[] elements; + private FlowNode[] elements; private int head = 0; private int tail = 0; - public FlowStageQueue(int initialCapacity) { - elements = new FlowStage[initialCapacity]; + public FlowNodeQueue(int initialCapacity) { + elements = new FlowNode[initialCapacity]; } /** * Add the specified context to the queue. */ - void add(FlowStage ctx) { - final FlowStage[] es = elements; + void add(FlowNode ctx) { + final FlowNode[] es = elements; int tail = this.tail; es[tail] = ctx; @@ -62,12 +62,12 @@ final class FlowStageQueue { } /** - * Remove a {@link FlowStage} from the queue or <code>null</code> if the queue is empty. + * Remove a {@link FlowNode} from the queue or <code>null</code> if the queue is empty. */ - FlowStage poll() { - final FlowStage[] es = elements; + FlowNode poll() { + final FlowNode[] es = elements; int head = this.head; - FlowStage ctx = es[head]; + FlowNode ctx = es[head]; if (ctx != null) { es[head] = null; @@ -87,7 +87,7 @@ final class FlowStageQueue { throw new IllegalStateException("Sorry, deque too big"); } - final FlowStage[] es = elements = Arrays.copyOf(elements, newCapacity); + final FlowNode[] es = elements = Arrays.copyOf(elements, newCapacity); // Exceptionally, here tail == head needs to be disambiguated if (tail < head || (tail == head && es[head] != null)) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/FlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowSupplier.java index f9432c33..87729fca 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/FlowSource.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowSupplier.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 AtLarge Research + * Copyright (c) 2024 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,17 +20,17 @@ * SOFTWARE. */ -package org.opendc.simulator.flow2.source; +package org.opendc.simulator.engine; -import org.opendc.simulator.flow2.FlowStage; -import org.opendc.simulator.flow2.Outlet; +public interface FlowSupplier { -/** - * A {@link FlowStage} with a single output. - */ -public interface FlowSource { - /** - * Return the output of this {@link FlowSource}. - */ - Outlet getOutput(); + void handleDemand(FlowEdge consumerEdge, float newDemand); + + void pushSupply(FlowEdge consumerEdge, float newSupply); + + void addConsumerEdge(FlowEdge consumerEdge); + + void removeConsumerEdge(FlowEdge consumerEdge); + + float getCapacity(); } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowTimerQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowTimerQueue.java index 4b746202..1e348b10 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowTimerQueue.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowTimerQueue.java @@ -20,21 +20,21 @@ * SOFTWARE. */ -package org.opendc.simulator.flow2; +package org.opendc.simulator.engine; import java.util.Arrays; /** - * A specialized priority queue for timers of {@link FlowStageLogic}s. + * A specialized priority queue for timers of {@link FlowNode}s. * <p> * By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation * being generic. */ -final class FlowTimerQueue { +public final class FlowTimerQueue { /** - * Array representation of binary heap of {@link FlowStage} instances. + * Array representation of binary heap of {@link FlowNode} instances. */ - private FlowStage[] queue; + private FlowNode[] queue; /** * The number of elements in the priority queue. @@ -47,21 +47,21 @@ final class FlowTimerQueue { * @param initialCapacity The initial capacity of the queue. */ public FlowTimerQueue(int initialCapacity) { - this.queue = new FlowStage[initialCapacity]; + this.queue = new FlowNode[initialCapacity]; } /** * Enqueue a timer for the specified context or update the existing timer. */ - void enqueue(FlowStage ctx) { - FlowStage[] es = queue; - int k = ctx.timerIndex; + public void enqueue(FlowNode node) { + FlowNode[] es = queue; + int k = node.timerIndex; - if (ctx.deadline != Long.MAX_VALUE) { + if (node.deadline != Long.MAX_VALUE) { if (k >= 0) { - update(es, ctx, k); + update(es, node, k); } else { - add(es, ctx); + add(es, node); } } else if (k >= 0) { delete(es, k); @@ -74,14 +74,13 @@ final class FlowTimerQueue { * @param now The timestamp that the deadline of the head of the queue should not exceed. * @return The head of the queue if its deadline does not exceed <code>now</code>, otherwise <code>null</code>. */ - FlowStage poll(long now) { - int size = this.size; - if (size == 0) { + public FlowNode poll(long now) { + if (this.size == 0) { return null; } - final FlowStage[] es = queue; - final FlowStage head = es[0]; + final FlowNode[] es = queue; + final FlowNode head = es[0]; if (now < head.deadline) { return null; @@ -89,7 +88,7 @@ final class FlowTimerQueue { int n = size - 1; this.size = n; - final FlowStage next = es[n]; + final FlowNode next = es[n]; es[n] = null; // Clear the last element of the queue if (n > 0) { @@ -103,9 +102,9 @@ final class FlowTimerQueue { /** * Find the earliest deadline in the queue. */ - long peekDeadline() { - if (size > 0) { - return queue[0].deadline; + public long peekDeadline() { + if (this.size > 0) { + return this.queue[0].deadline; } return Long.MAX_VALUE; @@ -114,43 +113,41 @@ final class FlowTimerQueue { /** * Add a new entry to the queue. */ - private void add(FlowStage[] es, FlowStage ctx) { - int i = size; - - if (i >= es.length) { + private void add(FlowNode[] es, FlowNode node) { + if (this.size >= es.length) { // Re-fetch the resized array es = grow(); } - siftUp(i, ctx, es); + siftUp(this.size, node, es); - size = i + 1; + this.size++; } /** * Update the deadline of an existing entry in the queue. */ - private void update(FlowStage[] es, FlowStage ctx, int k) { + private void update(FlowNode[] es, FlowNode node, int k) { if (k > 0) { int parent = (k - 1) >>> 1; - if (es[parent].deadline > ctx.deadline) { - siftUp(k, ctx, es); + if (es[parent].deadline > node.deadline) { + siftUp(k, node, es); return; } } - siftDown(k, ctx, es, size); + siftDown(k, node, es, this.size); } /** * Deadline an entry from the queue. */ - private void delete(FlowStage[] es, int k) { - int s = --size; + private void delete(FlowNode[] es, int k) { + int s = --this.size; if (s == k) { es[k] = null; // Element is last in the queue } else { - FlowStage moved = es[s]; + FlowNode moved = es[s]; es[s] = null; siftDown(k, moved, es, s); @@ -164,8 +161,8 @@ final class FlowTimerQueue { /** * Increases the capacity of the array. */ - private FlowStage[] grow() { - FlowStage[] queue = this.queue; + private FlowNode[] grow() { + FlowNode[] queue = this.queue; int oldCapacity = queue.length; int newCapacity = oldCapacity + (oldCapacity >> 1); @@ -174,10 +171,10 @@ final class FlowTimerQueue { return queue; } - private static void siftUp(int k, FlowStage key, FlowStage[] es) { + private static void siftUp(int k, FlowNode key, FlowNode[] es) { while (k > 0) { int parent = (k - 1) >>> 1; - FlowStage e = es[parent]; + FlowNode e = es[parent]; if (key.deadline >= e.deadline) break; es[k] = e; e.timerIndex = k; @@ -187,11 +184,11 @@ final class FlowTimerQueue { key.timerIndex = k; } - private static void siftDown(int k, FlowStage key, FlowStage[] es, int n) { + private static void siftDown(int k, FlowNode key, FlowNode[] es, int n) { int half = n >>> 1; // loop while a non-leaf while (k < half) { int child = (k << 1) + 1; // assume left child is least - FlowStage c = es[child]; + FlowNode c = es[child]; int right = child + 1; if (right < n && c.deadline > es[right].deadline) c = es[child = right]; diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InvocationStack.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/InvocationStack.java index a5b5114b..15da2f23 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InvocationStack.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/InvocationStack.java @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.simulator.flow2; +package org.opendc.simulator.engine; import java.util.Arrays; @@ -29,7 +29,7 @@ import java.util.Arrays; * <p> * By using a specialized class, we reduce the overhead caused by type-erasure. */ -final class InvocationStack { +public final class InvocationStack { /** * The array of elements in the stack. */ @@ -38,8 +38,8 @@ final class InvocationStack { private int head = -1; public InvocationStack(int initialCapacity) { - elements = new long[initialCapacity]; - Arrays.fill(elements, Long.MIN_VALUE); + this.elements = new long[initialCapacity]; + Arrays.fill(this.elements, Long.MIN_VALUE); } /** @@ -48,8 +48,8 @@ final class InvocationStack { * @param invocation The timestamp of the invocation. * @return <code>true</code> if the invocation was added, <code>false</code> otherwise. */ - boolean tryAdd(long invocation) { - final long[] es = elements; + public boolean tryAdd(long invocation) { + final long[] es = this.elements; int head = this.head; if (head < 0 || es[head] > invocation) { @@ -69,12 +69,11 @@ final class InvocationStack { /** * Remove the head invocation from the stack or return {@link Long#MAX_VALUE} if the stack is empty. */ - long poll() { - final long[] es = elements; + public long poll() { int head = this.head--; if (head >= 0) { - return es[head]; + return this.elements[head]; } return Long.MAX_VALUE; @@ -84,12 +83,12 @@ final class InvocationStack { * Doubles the capacity of this deque */ private void doubleCapacity() { - int oldCapacity = elements.length; + int oldCapacity = this.elements.length; int newCapacity = oldCapacity + (oldCapacity >> 1); if (newCapacity < 0) { throw new IllegalStateException("Sorry, deque too big"); } - elements = Arrays.copyOf(elements, newCapacity); + this.elements = Arrays.copyOf(this.elements, newCapacity); } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraph.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraph.java deleted file mode 100644 index f45be6cd..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraph.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow2; - -/** - * A representation of a flow network. A flow network is a directed graph where each edge has a capacity and receives an - * amount of flow that cannot exceed the edge's capacity. - */ -public interface FlowGraph { - /** - * Return the {@link FlowEngine} driving the simulation of the graph. - */ - FlowEngine getEngine(); - - /** - * Create a new {@link FlowStage} representing a node in the flow network. - * - * @param logic The logic for handling the events of the stage. - */ - FlowStage newStage(FlowStageLogic logic); - - /** - * Add an edge between the specified outlet port and inlet port in this graph. - * - * @param outlet The outlet of the source from which the flow originates. - * @param inlet The inlet of the sink that should receive the flow. - */ - void connect(Outlet outlet, Inlet inlet); - - /** - * Disconnect the specified {@link Outlet} (if connected). - * - * @param outlet The outlet to disconnect. - */ - void disconnect(Outlet outlet); - - /** - * Disconnect the specified {@link Inlet} (if connected). - * - * @param inlet The inlet to disconnect. - */ - void disconnect(Inlet inlet); -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraphInternal.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraphInternal.java deleted file mode 100644 index 0f608b60..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraphInternal.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow2; - -/** - * Interface implemented by {@link FlowGraph} implementations. - */ -interface FlowGraphInternal extends FlowGraph { - /** - * Internal method to remove the specified {@link FlowStage} from the graph. - */ - void detach(FlowStage stage); - - /** - * Helper method to connect an outlet to an inlet. - */ - static void connect(FlowGraph graph, Outlet outlet, Inlet inlet) { - if (!(outlet instanceof OutPort) || !(inlet instanceof InPort)) { - throw new IllegalArgumentException("Invalid outlet or inlet passed to graph"); - } - - InPort inPort = (InPort) inlet; - OutPort outPort = (OutPort) outlet; - - if (!graph.equals(outPort.getGraph()) || !graph.equals(inPort.getGraph())) { - throw new IllegalArgumentException("Outlet or inlet does not belong to graph"); - } else if (outPort.input != null || inPort.output != null) { - throw new IllegalStateException("Inlet or outlet already connected"); - } - - outPort.input = inPort; - inPort.output = outPort; - - inPort.connect(); - outPort.connect(); - } - - /** - * Helper method to disconnect an outlet. - */ - static void disconnect(FlowGraph graph, Outlet outlet) { - if (!(outlet instanceof OutPort)) { - throw new IllegalArgumentException("Invalid outlet passed to graph"); - } - - OutPort outPort = (OutPort) outlet; - - if (!graph.equals(outPort.getGraph())) { - throw new IllegalArgumentException("Outlet or inlet does not belong to graph"); - } - - outPort.cancel(null); - outPort.complete(); - } - - /** - * Helper method to disconnect an inlet. - */ - static void disconnect(FlowGraph graph, Inlet inlet) { - if (!(inlet instanceof InPort)) { - throw new IllegalArgumentException("Invalid outlet passed to graph"); - } - - InPort inPort = (InPort) inlet; - - if (!graph.equals(inPort.getGraph())) { - throw new IllegalArgumentException("Outlet or inlet does not belong to graph"); - } - - inPort.finish(null); - inPort.cancel(null); - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java deleted file mode 100644 index 25f87e04..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java +++ /dev/null @@ -1,312 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow2; - -import java.time.InstantSource; -import java.util.HashMap; -import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A {@link FlowStage} represents a node in a {@link FlowGraph}. - */ -public final class FlowStage { - private static final Logger LOGGER = LoggerFactory.getLogger(FlowStage.class); - - /** - * States of the flow stage. - */ - private static final int STAGE_PENDING = 0; // Stage is pending to be started - - private static final int STAGE_ACTIVE = 1; // Stage is actively running - private static final int STAGE_CLOSED = 2; // Stage is closed - private static final int STAGE_STATE = 0b11; // Mask for accessing the state of the flow stage - - /** - * Flags of the flow connection - */ - private static final int STAGE_INVALIDATE = 1 << 2; // The stage is invalidated - - private static final int STAGE_CLOSE = 1 << 3; // The stage should be closed - private static final int STAGE_UPDATE_ACTIVE = 1 << 4; // An update for the connection is active - private static final int STAGE_UPDATE_PENDING = 1 << 5; // An (immediate) update of the connection is pending - - /** - * The flags representing the state and pending actions for the stage. - */ - private int flags = STAGE_PENDING; - - /** - * The deadline of the stage after which an update should run. - */ - long deadline = Long.MAX_VALUE; - - /** - * The index of the timer in the {@link FlowTimerQueue}. - */ - int timerIndex = -1; - - final InstantSource clock; - private final FlowStageLogic logic; - final FlowGraphInternal parentGraph; - private final FlowEngine engine; - - private final Map<String, InPort> inlets = new HashMap<>(); - private final Map<String, OutPort> outlets = new HashMap<>(); - private int nextInlet = 0; - private int nextOutlet = 0; - - /** - * Construct a new {@link FlowStage} instance. - * - * @param parentGraph The {@link FlowGraph} this stage belongs to. - * @param logic The logic of the stage. - */ - FlowStage(FlowGraphInternal parentGraph, FlowStageLogic logic) { - this.parentGraph = parentGraph; - this.logic = logic; - this.engine = parentGraph.getEngine(); - this.clock = engine.getClock(); - } - - /** - * Return the {@link FlowGraph} to which this stage belongs. - */ - public FlowGraph getGraph() { - return parentGraph; - } - - /** - * Return the {@link Inlet} (an in-going edge) with the specified <code>name</code> for this {@link FlowStage}. - * If an inlet with that name does not exist, a new one is allocated for the stage. - * - * @param name The name of the inlet. - * @return The {@link InPort} representing an {@link Inlet} with the specified <code>name</code>. - */ - public InPort getInlet(String name) { - return inlets.computeIfAbsent(name, (key) -> new InPort(this, key, nextInlet++)); - } - - /** - * Return the {@link Outlet} (an out-going edge) with the specified <code>name</code> for this {@link FlowStage}. - * If an outlet with that name does not exist, a new one is allocated for the stage. - * - * @param name The name of the outlet. - * @return The {@link OutPort} representing an {@link Outlet} with the specified <code>name</code>. - */ - public OutPort getOutlet(String name) { - return outlets.computeIfAbsent(name, (key) -> new OutPort(this, key, nextOutlet++)); - } - - /** - * Return the current deadline of the {@link FlowStage}'s timer (in milliseconds after epoch). - */ - public long getDeadline() { - return deadline; - } - - /** - * Set the deadline of the {@link FlowStage}'s timer. - * - * @param deadline The new deadline (in milliseconds after epoch) when the stage should be interrupted. - */ - public void setDeadline(long deadline) { - this.deadline = deadline; - - if ((flags & STAGE_UPDATE_ACTIVE) == 0) { - // Update the timer queue with the new deadline - engine.scheduleDelayed(this); - } - } - - /** - * Invalidate the {@link FlowStage} forcing the stage to update. - */ - public void invalidate() { - int flags = this.flags; - - if ((flags & STAGE_UPDATE_ACTIVE) == 0) { - scheduleImmediate(clock.millis(), flags | STAGE_INVALIDATE); - } - } - - /** - * Synchronously update the {@link FlowStage} at the current timestamp. - */ - public void sync() { - this.flags |= STAGE_INVALIDATE; - onUpdate(clock.millis()); - engine.scheduleDelayed(this); - } - - /** - * Close the {@link FlowStage} and disconnect all inlets and outlets. - */ - public void close() { - int flags = this.flags; - - if ((flags & STAGE_STATE) == STAGE_CLOSED) { - return; - } - - // Toggle the close bit. In case no update is active, schedule a new update. - if ((flags & STAGE_UPDATE_ACTIVE) != 0) { - this.flags = flags | STAGE_CLOSE; - } else { - scheduleImmediate(clock.millis(), flags | STAGE_CLOSE); - } - } - - /** - * Update the state of the flow stage. - * - * @param now The current virtual timestamp. - */ - void onUpdate(long now) { - int flags = this.flags; - int state = flags & STAGE_STATE; - - if (state == STAGE_ACTIVE) { - doUpdate(now, flags); - } else if (state == STAGE_PENDING) { - doStart(now, flags); - } - } - - /** - * Invalidate the {@link FlowStage} forcing the stage to update. - * - * <p> - * This method is similar to {@link #invalidate()}, but allows the user to manually pass the current timestamp to - * prevent having to re-query the clock. This method should not be called during an update. - */ - void invalidate(long now) { - scheduleImmediate(now, flags | STAGE_INVALIDATE); - } - - /** - * Schedule an immediate update for this stage. - */ - private void scheduleImmediate(long now, int flags) { - // In case an immediate update is already scheduled, no need to do anything - if ((flags & STAGE_UPDATE_PENDING) != 0) { - this.flags = flags; - return; - } - - // Mark the stage that there is an update pending - this.flags = flags | STAGE_UPDATE_PENDING; - - engine.scheduleImmediate(now, this); - } - - /** - * Start the stage. - */ - private void doStart(long now, int flags) { - // Update state before calling into the outside world, so it observes a consistent state - flags = flags | STAGE_ACTIVE | STAGE_UPDATE_ACTIVE; - - doUpdate(now, flags); - } - - /** - * Update the state of the stage. - */ - private void doUpdate(long now, int flags) { - long deadline = this.deadline; - long newDeadline = deadline; - - // Update the stage if: - // (1) the timer of the stage has expired. - // (2) one of the input ports is pushed, - // (3) one of the output ports is pulled, - if ((flags & STAGE_INVALIDATE) != 0 || deadline == now) { - // Update state before calling into the outside world, so it observes a consistent state - this.flags = (flags & ~STAGE_INVALIDATE) | STAGE_UPDATE_ACTIVE; - - try { - newDeadline = logic.onUpdate(this, now); - - // IMPORTANT: Re-fetch the flags after the callback might have changed those - flags = this.flags; - } catch (Exception e) { - doFail(e); - } - } - - // Check whether the stage is marked as closing. - if ((flags & STAGE_CLOSE) != 0) { - doClose(flags, null); - - // IMPORTANT: Re-fetch the flags after the callback might have changed those - flags = this.flags; - } - - // Indicate that no update is active anymore and flush the flags - this.flags = flags & ~(STAGE_UPDATE_ACTIVE | STAGE_UPDATE_PENDING); - this.deadline = newDeadline; - - // Update the timer queue with the new deadline - engine.scheduleDelayedInContext(this); - } - - /** - * This method is invoked when an uncaught exception is caught by the engine. When this happens, the - * {@link FlowStageLogic} "fails" and disconnects all its inputs and outputs. - */ - void doFail(Throwable cause) { - LOGGER.warn("Uncaught exception (closing stage)", cause); - - doClose(flags, cause); - } - - /** - * This method is invoked when the {@link FlowStageLogic} exits successfully or due to failure. - */ - private void doClose(int flags, Throwable cause) { - // Mark the stage as closed - this.flags = flags & ~(STAGE_STATE | STAGE_INVALIDATE | STAGE_CLOSE) | STAGE_CLOSED; - - // Remove stage from parent graph - parentGraph.detach(this); - - // Remove stage from the timer queue - setDeadline(Long.MAX_VALUE); - - // Cancel all input ports - for (InPort port : inlets.values()) { - if (port != null) { - port.cancel(cause); - } - } - - // Cancel all output ports - for (OutPort port : outlets.values()) { - if (port != null) { - port.fail(cause); - } - } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageLogic.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageLogic.java deleted file mode 100644 index 70986a35..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageLogic.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow2; - -/** - * The {@link FlowStageLogic} interface is responsible for describing the behaviour of a {@link FlowStage} via - * out-going flows based on its potential inputs. - */ -public interface FlowStageLogic { - /** - * This method is invoked when the one of the stage's inlets or outlets is invalidated. - * - * @param ctx The context in which the stage runs. - * @param now The virtual timestamp in milliseconds after epoch at which the update is occurring. - * @return The next deadline for the stage. - */ - long onUpdate(FlowStage ctx, long now); -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandler.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandler.java deleted file mode 100644 index 839b01db..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandler.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow2; - -/** - * Collection of callbacks for the input port (a {@link InPort}) of a {@link FlowStageLogic}. - */ -public interface InHandler { - /** - * Return the actual flow rate over the input port. - * - * @param port The input port to which the flow was pushed. - * @return The actual flow rate over the port. - */ - default float getRate(InPort port) { - return Math.min(port.getDemand(), port.getCapacity()); - } - - /** - * This method is invoked when another {@link FlowStageLogic} changes the rate of flow to the specified inlet. - * - * @param port The input port to which the flow was pushed. - * @param demand The rate of flow the output attempted to push to the port. - */ - void onPush(InPort port, float demand); - - /** - * This method is invoked when the input port is finished. - * - * @param port The input port that has finished. - * @param cause The cause of the input port being finished or <code>null</code> if the port completed successfully. - */ - void onUpstreamFinish(InPort port, Throwable cause); -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandlers.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandlers.java deleted file mode 100644 index 9d5b4bef..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandlers.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow2; - -/** - * A collection of common {@link InHandler} implementations. - */ -public class InHandlers { - /** - * Prevent construction of this class. - */ - private InHandlers() {} - - /** - * Return an {@link InHandler} that does nothing. - */ - public static InHandler noop() { - return NoopInHandler.INSTANCE; - } - - /** - * No-op implementation of {@link InHandler}. - */ - private static final class NoopInHandler implements InHandler { - public static final InHandler INSTANCE = new NoopInHandler(); - - @Override - public void onPush(InPort port, float demand) {} - - @Override - public void onUpstreamFinish(InPort port, Throwable cause) {} - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java deleted file mode 100644 index 16fed4eb..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow2; - -import java.time.InstantSource; -import java.util.Objects; - -/** - * A port that consumes a flow. - * <p> - * Input ports are represented as in-going edges in the flow graph. - */ -public final class InPort implements Inlet { - private final int id; - - private float capacity; - private float demand; - - private boolean mask; - - OutPort output; - private InHandler handler = InHandlers.noop(); - private final InstantSource clock; - private final String name; - private final FlowStage stage; - - InPort(FlowStage stage, String name, int id) { - this.name = name; - this.id = id; - this.stage = stage; - this.clock = stage.clock; - } - - @Override - public FlowGraph getGraph() { - return stage.parentGraph; - } - - @Override - public String getName() { - return name; - } - - /** - * Return the identifier of the {@link InPort} with respect to its stage. - */ - public int getId() { - return id; - } - - /** - * Return the current capacity of the input port. - */ - public float getCapacity() { - return capacity; - } - - /** - * Return the current demand of flow of the input port. - */ - public float getDemand() { - return demand; - } - - /** - * Return the current rate of flow of the input port. - */ - public float getRate() { - return handler.getRate(this); - } - - /** - * Pull the flow with the specified <code>capacity</code> from the input port. - * - * @param capacity The maximum throughput that the stage can receive from the input port. - */ - public void pull(float capacity) { - this.capacity = capacity; - - OutPort output = this.output; - if (output != null) { - output.pull(capacity); - } - } - - /** - * Return the current {@link InHandler} of the input port. - */ - public InHandler getHandler() { - return handler; - } - - /** - * Set the {@link InHandler} of the input port. - */ - public void setHandler(InHandler handler) { - this.handler = handler; - } - - /** - * Return the mask of this port. - * <p> - * Stages ignore events originating from masked ports. - */ - public boolean getMask() { - return mask; - } - - /** - * (Un)mask the port. - */ - public void setMask(boolean mask) { - this.mask = mask; - } - - /** - * Disconnect the input port from its (potentially) connected outlet. - * <p> - * The inlet can still be used and re-connected to another outlet. - * - * @param cause The cause for disconnecting the port or <code>null</code> when no more flow is needed. - */ - public void cancel(Throwable cause) { - demand = 0.f; - - OutPort output = this.output; - if (output != null) { - this.output = null; - output.input = null; - output.cancel(cause); - } - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - InPort port = (InPort) o; - return stage.equals(port.stage) && name.equals(port.name); - } - - @Override - public int hashCode() { - return Objects.hash(stage.parentGraph, name); - } - - /** - * This method is invoked when the inlet is connected to an outlet. - */ - void connect() { - OutPort output = this.output; - output.pull(capacity); - } - - /** - * Push a flow from an outlet to this inlet. - * - * @param demand The rate of flow to push. - */ - void push(float demand) { - // No-op when the rate is unchanged - if (this.demand == demand) { - return; - } - - try { - handler.onPush(this, demand); - this.demand = demand; - - if (!mask) { - stage.invalidate(clock.millis()); - } - } catch (Exception e) { - stage.doFail(e); - } - } - - /** - * This method is invoked by the connected {@link OutPort} when it finishes. - */ - void finish(Throwable cause) { - try { - long now = clock.millis(); - handler.onUpstreamFinish(this, cause); - this.demand = 0.f; - - if (!mask) { - stage.invalidate(now); - } - } catch (Exception e) { - stage.doFail(e); - } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandler.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandler.java deleted file mode 100644 index 723c6d6b..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandler.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow2; - -/** - * Collection of callbacks for the output port (a {@link OutPort}) of a {@link FlowStageLogic}. - */ -public interface OutHandler { - /** - * This method is invoked when another {@link FlowStageLogic} changes the capacity of the outlet. - * - * @param port The output port of which the capacity was changed. - * @param capacity The new capacity of the outlet. - */ - void onPull(OutPort port, float capacity); - - /** - * This method is invoked when the output port no longer accepts any flow. - * <p> - * After this callback no other callbacks will be called for this port. - * - * @param port The outlet that no longer accepts any flow. - * @param cause The cause of the output port no longer accepting any flow or <code>null</code> if the port closed - * successfully. - */ - void onDownstreamFinish(OutPort port, Throwable cause); -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandlers.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandlers.java deleted file mode 100644 index 8fbfda0d..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandlers.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow2; - -/** - * A collection of common {@link OutHandler} implementations. - */ -public class OutHandlers { - /** - * Prevent construction of this class. - */ - private OutHandlers() {} - - /** - * Return an {@link OutHandler} that does nothing. - */ - public static OutHandler noop() { - return NoopOutHandler.INSTANCE; - } - - /** - * No-op implementation of {@link OutHandler}. - */ - private static final class NoopOutHandler implements OutHandler { - public static final OutHandler INSTANCE = new NoopOutHandler(); - - @Override - public void onPull(OutPort port, float capacity) {} - - @Override - public void onDownstreamFinish(OutPort port, Throwable cause) {} - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java deleted file mode 100644 index 1f7ed4ee..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow2; - -import java.time.InstantSource; -import java.util.Objects; - -/** - * A port that outputs a flow. - * <p> - * Output ports are represented as out-going edges in the flow graph. - */ -public final class OutPort implements Outlet { - private final int id; - - private float capacity; - private float demand; - - private boolean mask; - - InPort input; - private OutHandler handler = OutHandlers.noop(); - private final String name; - private final FlowStage stage; - private final InstantSource clock; - - OutPort(FlowStage stage, String name, int id) { - this.name = name; - this.id = id; - this.stage = stage; - this.clock = stage.clock; - } - - @Override - public FlowGraph getGraph() { - return stage.parentGraph; - } - - @Override - public String getName() { - return name; - } - - /** - * Return the identifier of the {@link OutPort} with respect to its stage. - */ - public int getId() { - return id; - } - - /** - * Return the capacity of the output port. - */ - public float getCapacity() { - return capacity; - } - - /** - * Return the current demand of flow of the output port. - */ - public float getDemand() { - return demand; - } - - /** - * Return the current rate of flow of the input port. - */ - public float getRate() { - InPort input = this.input; - if (input != null) { - return input.getRate(); - } - - return 0.f; - } - - /** - * Return the current {@link OutHandler} of the output port. - */ - public OutHandler getHandler() { - return handler; - } - - /** - * Set the {@link OutHandler} of the output port. - */ - public void setHandler(OutHandler handler) { - this.handler = handler; - } - - /** - * Return the mask of this port. - * <p> - * Stages ignore events originating from masked ports. - */ - public boolean getMask() { - return mask; - } - - /** - * (Un)mask the port. - */ - public void setMask(boolean mask) { - this.mask = mask; - } - - /** - * Push the given flow rate over output port. - * - * @param rate The rate of the flow to push. - */ - public void push(float rate) { - demand = rate; - InPort input = this.input; - - if (input != null) { - input.push(rate); - } - } - - /** - * Signal to the downstream port that the output has completed successfully and disconnect the port from its input. - * <p> - * The output port can still be used and re-connected to another input. - */ - public void complete() { - fail(null); - } - - /** - * Signal a failure to the downstream port and disconnect the port from its input. - * <p> - * The output can still be used and re-connected to another input. - */ - public void fail(Throwable cause) { - capacity = 0.f; - - InPort input = this.input; - if (input != null) { - this.input = null; - input.output = null; - input.finish(cause); - } - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - OutPort port = (OutPort) o; - return stage.equals(port.stage) && name.equals(port.name); - } - - @Override - public int hashCode() { - return Objects.hash(stage.parentGraph, name); - } - - /** - * This method is invoked when the outlet is connected to an inlet. - */ - void connect() { - input.push(demand); - } - - /** - * Pull from this outlet with a specified capacity. - * - * @param capacity The capacity of the inlet. - */ - void pull(float capacity) { - // No-op when outlet is not active or the rate is unchanged - if (this.capacity == capacity) { - return; - } - - try { - handler.onPull(this, capacity); - this.capacity = capacity; - - if (!mask) { - stage.invalidate(clock.millis()); - } - } catch (Exception e) { - stage.doFail(e); - } - } - - /** - * This method is invoked by the connected {@link InPort} when downstream cancels the connection. - */ - void cancel(Throwable cause) { - try { - handler.onDownstreamFinish(this, cause); - this.capacity = 0.f; - - if (!mask) { - stage.invalidate(clock.millis()); - } - } catch (Exception e) { - stage.doFail(e); - } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Outlet.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Outlet.java deleted file mode 100644 index 32e19a3b..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Outlet.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow2; - -/** - * An out-going edge in a {@link FlowGraph}. - */ -public interface Outlet { - /** - * Return the {@link FlowGraph} to which the outlet is exposed. - */ - FlowGraph getGraph(); - - /** - * Return the name of the outlet. - */ - String getName(); -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java deleted file mode 100644 index dec98955..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow2.mux; - -import org.opendc.simulator.flow2.FlowStageLogic; -import org.opendc.simulator.flow2.Inlet; -import org.opendc.simulator.flow2.Outlet; - -/** - * A {@link FlowStageLogic} that multiplexes multiple inputs over (possibly) multiple outputs. - */ -public interface FlowMultiplexer { - /** - * Return maximum number of inputs supported by the multiplexer. - */ - int getMaxInputs(); - - /** - * Return maximum number of outputs supported by the multiplexer. - */ - int getMaxOutputs(); - - /** - * Return the number of active inputs on this multiplexer. - */ - int getInputCount(); - - /** - * Allocate a new input on this multiplexer with the specified capacity.. - * - * @return The identifier of the input for this stage. - */ - Inlet newInput(); - - /** - * Release the input at the specified slot. - * - * @param inlet The inlet to release. - */ - void releaseInput(Inlet inlet); - - /** - * Return the number of active outputs on this multiplexer. - */ - int getOutputCount(); - - /** - * Allocate a new output on this multiplexer. - * - * @return The outlet for this stage. - */ - Outlet newOutput(); - - /** - * Release the output at the specified slot. - * - * @param outlet The outlet to release. - */ - void releaseOutput(Outlet outlet); - - /** - * Return the total input capacity of the {@link FlowMultiplexer}. - */ - float getCapacity(); - - /** - * Return the total input demand for the {@link FlowMultiplexer}. - */ - float getDemand(); - - /** - * Return the total input rate for the {@link FlowMultiplexer}. - */ - float getRate(); -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexerFactory.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexerFactory.java deleted file mode 100644 index 0b5b9141..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexerFactory.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow2.mux; - -import org.opendc.simulator.flow2.FlowGraph; - -/** - * Factory interface for a {@link FlowMultiplexer} implementation. - */ -public interface FlowMultiplexerFactory { - /** - * Construct a new {@link FlowMultiplexer} belonging to the specified {@link FlowGraph}. - * - * @param graph The graph to which the multiplexer belongs. - */ - FlowMultiplexer newMultiplexer(FlowGraph graph); - - /** - * Return a {@link FlowMultiplexerFactory} for {@link ForwardingFlowMultiplexer} instances. - */ - static FlowMultiplexerFactory forwardingMultiplexer() { - return ForwardingFlowMultiplexer.FACTORY; - } - - /** - * Return a {@link FlowMultiplexerFactory} for {@link MaxMinFlowMultiplexer} instances. - */ - static FlowMultiplexerFactory maxMinMultiplexer() { - return MaxMinFlowMultiplexer.FACTORY; - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java deleted file mode 100644 index e0564cd2..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java +++ /dev/null @@ -1,287 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow2.mux; - -import java.util.Arrays; -import java.util.BitSet; -import org.opendc.simulator.flow2.FlowGraph; -import org.opendc.simulator.flow2.FlowStage; -import org.opendc.simulator.flow2.FlowStageLogic; -import org.opendc.simulator.flow2.InHandler; -import org.opendc.simulator.flow2.InPort; -import org.opendc.simulator.flow2.Inlet; -import org.opendc.simulator.flow2.OutHandler; -import org.opendc.simulator.flow2.OutPort; -import org.opendc.simulator.flow2.Outlet; - -/** - * A {@link FlowMultiplexer} implementation that allocates inputs to the outputs of the multiplexer exclusively. - * This means that a single input is directly connected to an output and that the multiplexer can only support as many - * inputs as outputs. - */ -public final class ForwardingFlowMultiplexer implements FlowMultiplexer, FlowStageLogic { - /** - * Factory implementation for this implementation. - */ - static FlowMultiplexerFactory FACTORY = ForwardingFlowMultiplexer::new; - - public final IdleInHandler IDLE_IN_HANDLER = new IdleInHandler(); - public final IdleOutHandler IDLE_OUT_HANDLER = new IdleOutHandler(); - - private final FlowStage stage; - - private InPort[] inlets; - private OutPort[] outlets; - private final BitSet activeInputs; - private final BitSet activeOutputs; - private final BitSet availableOutputs; - - private float capacity = 0.f; - private float demand = 0.f; - - public ForwardingFlowMultiplexer(FlowGraph graph) { - this.stage = graph.newStage(this); - - this.inlets = new InPort[4]; - this.activeInputs = new BitSet(); - this.outlets = new OutPort[4]; - this.activeOutputs = new BitSet(); - this.availableOutputs = new BitSet(); - } - - @Override - public float getCapacity() { - return capacity; - } - - @Override - public float getDemand() { - return demand; - } - - @Override - public float getRate() { - final BitSet activeOutputs = this.activeOutputs; - final OutPort[] outlets = this.outlets; - float rate = 0.f; - for (int i = activeOutputs.nextSetBit(0); i != -1; i = activeOutputs.nextSetBit(i + 1)) { - rate += outlets[i].getRate(); - } - return rate; - } - - @Override - public int getMaxInputs() { - return getOutputCount(); - } - - @Override - public int getMaxOutputs() { - return Integer.MAX_VALUE; - } - - @Override - public int getInputCount() { - return activeInputs.length(); - } - - @Override - public Inlet newInput() { - final BitSet activeInputs = this.activeInputs; - int slot = activeInputs.nextClearBit(0); - - InPort inPort = stage.getInlet("in" + slot); - inPort.setMask(true); - - InPort[] inlets = this.inlets; - if (slot >= inlets.length) { - int newLength = inlets.length + (inlets.length >> 1); - inlets = Arrays.copyOf(inlets, newLength); - this.inlets = inlets; - } - - final BitSet availableOutputs = this.availableOutputs; - int outSlot = availableOutputs.nextSetBit(0); - - if (outSlot < 0) { - throw new IllegalStateException("No capacity available for a new input"); - } - - inlets[slot] = inPort; - activeInputs.set(slot); - - OutPort outPort = outlets[outSlot]; - availableOutputs.clear(outSlot); - - inPort.setHandler(new ForwardingInHandler(outPort)); - outPort.setHandler(new ForwardingOutHandler(inPort)); - - inPort.pull(outPort.getCapacity()); - - return inPort; - } - - @Override - public void releaseInput(Inlet inlet) { - InPort port = (InPort) inlet; - int slot = port.getId(); - - final BitSet activeInputs = this.activeInputs; - - if (!activeInputs.get(slot)) { - return; - } - - port.cancel(null); - activeInputs.clear(slot); - - ForwardingInHandler inHandler = (ForwardingInHandler) port.getHandler(); - availableOutputs.set(inHandler.output.getId()); - - port.setHandler(IDLE_IN_HANDLER); - } - - @Override - public int getOutputCount() { - return activeOutputs.length(); - } - - @Override - public Outlet newOutput() { - final BitSet activeOutputs = this.activeOutputs; - int slot = activeOutputs.nextClearBit(0); - - OutPort port = stage.getOutlet("out" + slot); - OutPort[] outlets = this.outlets; - if (slot >= outlets.length) { - int newLength = outlets.length + (outlets.length >> 1); - outlets = Arrays.copyOf(outlets, newLength); - this.outlets = outlets; - } - outlets[slot] = port; - - activeOutputs.set(slot); - availableOutputs.set(slot); - - port.setHandler(IDLE_OUT_HANDLER); - - return port; - } - - @Override - public void releaseOutput(Outlet outlet) { - OutPort port = (OutPort) outlet; - int slot = port.getId(); - activeInputs.clear(slot); - availableOutputs.clear(slot); - port.complete(); - - port.setHandler(IDLE_OUT_HANDLER); - } - - @Override - public long onUpdate(FlowStage ctx, long now) { - return Long.MAX_VALUE; - } - - class ForwardingInHandler implements InHandler { - final OutPort output; - - ForwardingInHandler(OutPort output) { - this.output = output; - } - - @Override - public float getRate(InPort port) { - return output.getRate(); - } - - @Override - public void onPush(InPort port, float rate) { - ForwardingFlowMultiplexer.this.demand += -port.getDemand() + rate; - - output.push(rate); - } - - @Override - public void onUpstreamFinish(InPort port, Throwable cause) { - ForwardingFlowMultiplexer.this.demand -= port.getDemand(); - - final OutPort output = this.output; - output.push(0.f); - - releaseInput(port); - } - } - - private class ForwardingOutHandler implements OutHandler { - private final InPort input; - - ForwardingOutHandler(InPort input) { - this.input = input; - } - - @Override - public void onPull(OutPort port, float capacity) { - ForwardingFlowMultiplexer.this.capacity += -port.getCapacity() + capacity; - - input.pull(capacity); - } - - @Override - public void onDownstreamFinish(OutPort port, Throwable cause) { - ForwardingFlowMultiplexer.this.capacity -= port.getCapacity(); - - input.cancel(cause); - - releaseOutput(port); - } - } - - private static class IdleInHandler implements InHandler { - @Override - public float getRate(InPort port) { - return 0.f; - } - - @Override - public void onPush(InPort port, float rate) { - port.cancel(new IllegalStateException("Inlet is not allocated")); - } - - @Override - public void onUpstreamFinish(InPort port, Throwable cause) {} - } - - private class IdleOutHandler implements OutHandler { - @Override - public void onPull(OutPort port, float capacity) { - ForwardingFlowMultiplexer.this.capacity += -port.getCapacity() + capacity; - } - - @Override - public void onDownstreamFinish(OutPort port, Throwable cause) { - ForwardingFlowMultiplexer.this.capacity -= port.getCapacity(); - } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java deleted file mode 100644 index ac5c4f5c..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java +++ /dev/null @@ -1,297 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow2.mux; - -import java.util.Arrays; -import java.util.BitSet; -import org.opendc.simulator.flow2.FlowGraph; -import org.opendc.simulator.flow2.FlowStage; -import org.opendc.simulator.flow2.FlowStageLogic; -import org.opendc.simulator.flow2.InHandler; -import org.opendc.simulator.flow2.InPort; -import org.opendc.simulator.flow2.Inlet; -import org.opendc.simulator.flow2.OutHandler; -import org.opendc.simulator.flow2.OutPort; -import org.opendc.simulator.flow2.Outlet; - -/** - * A {@link FlowMultiplexer} implementation that distributes the available capacity of the outputs over the inputs - * using max-min fair sharing. - * <p> - * The max-min fair sharing algorithm of this multiplexer ensures that each input receives a fair share of the combined - * output capacity, but allows individual inputs to use more capacity if there is still capacity left. - */ -public final class MaxMinFlowMultiplexer implements FlowMultiplexer, FlowStageLogic { - /** - * Factory implementation for this implementation. - */ - static FlowMultiplexerFactory FACTORY = MaxMinFlowMultiplexer::new; - - private final FlowStage stage; - private final BitSet activeInputs; - private final BitSet activeOutputs; - - private float capacity = 0.f; - private float demand = 0.f; - private float rate = 0.f; - - private InPort[] inlets; - private long[] inputs; - private float[] rates; - private OutPort[] outlets; - - private final MultiplexerInHandler inHandler = new MultiplexerInHandler(); - private final MultiplexerOutHandler outHandler = new MultiplexerOutHandler(); - - /** - * Construct a {@link MaxMinFlowMultiplexer} instance. - * - * @param graph The {@link FlowGraph} to add the multiplexer to. - */ - public MaxMinFlowMultiplexer(FlowGraph graph) { - this.stage = graph.newStage(this); - this.activeInputs = new BitSet(); - this.activeOutputs = new BitSet(); - - this.inlets = new InPort[4]; - this.inputs = new long[4]; - this.rates = new float[4]; - this.outlets = new OutPort[4]; - } - - @Override - public float getCapacity() { - return capacity; - } - - @Override - public float getDemand() { - return demand; - } - - @Override - public float getRate() { - return rate; - } - - @Override - public int getMaxInputs() { - return Integer.MAX_VALUE; - } - - @Override - public int getMaxOutputs() { - return Integer.MAX_VALUE; - } - - @Override - public long onUpdate(FlowStage ctx, long now) { - float capacity = this.capacity; - float demand = this.demand; - float rate = demand; - - if (demand > capacity) { - rate = redistributeCapacity(inlets, inputs, rates, capacity); - } - - if (this.rate != rate) { - // Only update the outputs if the output rate has changed - this.rate = rate; - - changeRate(activeOutputs, outlets, capacity, rate); - } - - return Long.MAX_VALUE; - } - - @Override - public int getInputCount() { - return activeInputs.length(); - } - - @Override - public Inlet newInput() { - final BitSet activeInputs = this.activeInputs; - int slot = activeInputs.nextClearBit(0); - - InPort port = stage.getInlet("in" + slot); - port.setHandler(inHandler); - port.pull(this.capacity); - - InPort[] inlets = this.inlets; - if (slot >= inlets.length) { - int newLength = inlets.length + (inlets.length >> 1); - inlets = Arrays.copyOf(inlets, newLength); - inputs = Arrays.copyOf(inputs, newLength); - rates = Arrays.copyOf(rates, newLength); - this.inlets = inlets; - } - inlets[slot] = port; - - activeInputs.set(slot); - return port; - } - - @Override - public void releaseInput(Inlet inlet) { - InPort port = (InPort) inlet; - - activeInputs.clear(port.getId()); - port.cancel(null); - } - - @Override - public int getOutputCount() { - return activeOutputs.length(); - } - - @Override - public Outlet newOutput() { - final BitSet activeOutputs = this.activeOutputs; - int slot = activeOutputs.nextClearBit(0); - - OutPort port = stage.getOutlet("out" + slot); - port.setHandler(outHandler); - - OutPort[] outlets = this.outlets; - if (slot >= outlets.length) { - int newLength = outlets.length + (outlets.length >> 1); - outlets = Arrays.copyOf(outlets, newLength); - this.outlets = outlets; - } - outlets[slot] = port; - - activeOutputs.set(slot); - return port; - } - - @Override - public void releaseOutput(Outlet outlet) { - OutPort port = (OutPort) outlet; - activeInputs.clear(port.getId()); - port.complete(); - } - - /** - * Helper function to redistribute the specified capacity across the inlets. - */ - private static float redistributeCapacity(InPort[] inlets, long[] inputs, float[] rates, float capacity) { - // If the demand is higher than the capacity, we need use max-min fair sharing to distribute the - // constrained capacity across the inputs. - for (int i = 0; i < inputs.length; i++) { - InPort inlet = inlets[i]; - if (inlet == null) { - break; - } - - inputs[i] = ((long) Float.floatToRawIntBits(inlet.getDemand()) << 32) | (i & 0xFFFFFFFFL); - } - Arrays.sort(inputs); - - float availableCapacity = capacity; - int inputSize = inputs.length; - - // Divide the available output capacity fairly over the inputs using max-min fair sharing - for (int i = 0; i < inputs.length; i++) { - long v = inputs[i]; - int slot = (int) v; - float d = Float.intBitsToFloat((int) (v >> 32)); - - if (d == 0.0) { - continue; - } - - float availableShare = availableCapacity / (inputSize - i); - float r = Math.min(d, availableShare); - - rates[slot] = r; - availableCapacity -= r; - } - - return capacity - availableCapacity; - } - - /** - * Helper method to change the rate of the outlets. - */ - private static void changeRate(BitSet activeOutputs, OutPort[] outlets, float capacity, float rate) { - // Divide the requests over the available capacity of the input resources fairly - for (int i = activeOutputs.nextSetBit(0); i != -1; i = activeOutputs.nextSetBit(i + 1)) { - OutPort outlet = outlets[i]; - float fraction = outlet.getCapacity() / capacity; - outlet.push(rate * fraction); - } - } - - /** - * A {@link InHandler} implementation for the multiplexer inputs. - */ - private class MultiplexerInHandler implements InHandler { - @Override - public float getRate(InPort port) { - return rates[port.getId()]; - } - - @Override - public void onPush(InPort port, float demand) { - MaxMinFlowMultiplexer.this.demand += -port.getDemand() + demand; - rates[port.getId()] = demand; - } - - @Override - public void onUpstreamFinish(InPort port, Throwable cause) { - MaxMinFlowMultiplexer.this.demand -= port.getDemand(); - releaseInput(port); - rates[port.getId()] = 0.f; - } - } - - /** - * A {@link OutHandler} implementation for the multiplexer outputs. - */ - private class MultiplexerOutHandler implements OutHandler { - @Override - public void onPull(OutPort port, float capacity) { - float newCapacity = MaxMinFlowMultiplexer.this.capacity - port.getCapacity() + capacity; - MaxMinFlowMultiplexer.this.capacity = newCapacity; - changeInletCapacity(newCapacity); - } - - @Override - public void onDownstreamFinish(OutPort port, Throwable cause) { - float newCapacity = MaxMinFlowMultiplexer.this.capacity - port.getCapacity(); - MaxMinFlowMultiplexer.this.capacity = newCapacity; - releaseOutput(port); - changeInletCapacity(newCapacity); - } - - private void changeInletCapacity(float capacity) { - BitSet activeInputs = MaxMinFlowMultiplexer.this.activeInputs; - InPort[] inlets = MaxMinFlowMultiplexer.this.inlets; - - for (int i = activeInputs.nextSetBit(0); i != -1; i = activeInputs.nextSetBit(i + 1)) { - inlets[i].pull(capacity); - } - } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/FlowSink.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/FlowSink.java deleted file mode 100644 index 69c94708..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/FlowSink.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow2.sink; - -import org.opendc.simulator.flow2.FlowStage; -import org.opendc.simulator.flow2.Inlet; - -/** - * A {@link FlowStage} with a single input. - */ -public interface FlowSink { - /** - * Return the input of this {@link FlowSink}. - */ - Inlet getInput(); -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/SimpleFlowSink.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/SimpleFlowSink.java deleted file mode 100644 index fdfe5ee8..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/SimpleFlowSink.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow2.sink; - -import org.opendc.simulator.flow2.FlowGraph; -import org.opendc.simulator.flow2.FlowStage; -import org.opendc.simulator.flow2.FlowStageLogic; -import org.opendc.simulator.flow2.InHandler; -import org.opendc.simulator.flow2.InPort; -import org.opendc.simulator.flow2.Inlet; - -/** - * A sink with a fixed capacity. - */ -public final class SimpleFlowSink implements FlowSink, FlowStageLogic { - private final FlowStage stage; - private final InPort input; - private final Handler handler; - - /** - * Construct a new {@link SimpleFlowSink} with the specified initial capacity. - * - * @param graph The graph to add the sink to. - * @param initialCapacity The initial capacity of the sink. - */ - public SimpleFlowSink(FlowGraph graph, float initialCapacity) { - this.stage = graph.newStage(this); - this.handler = new Handler(); - this.input = stage.getInlet("in"); - this.input.pull(initialCapacity); - this.input.setMask(true); - this.input.setHandler(handler); - } - - /** - * Return the {@link Inlet} of this sink. - */ - @Override - public Inlet getInput() { - return input; - } - - /** - * Return the capacity of the sink. - */ - public float getCapacity() { - return input.getCapacity(); - } - - /** - * Update the capacity of the sink. - * - * @param capacity The new capacity to update the sink to. - */ - public void setCapacity(float capacity) { - input.pull(capacity); - stage.invalidate(); - } - - /** - * Return the flow rate of the sink. - */ - public float getRate() { - return input.getRate(); - } - - /** - * Remove this node from the graph. - */ - public void close() { - stage.close(); - } - - @Override - public long onUpdate(FlowStage ctx, long now) { - InPort input = this.input; - handler.rate = Math.min(input.getDemand(), input.getCapacity()); - return Long.MAX_VALUE; - } - - /** - * The {@link InHandler} implementation for the sink. - */ - private static final class Handler implements InHandler { - float rate; - - @Override - public float getRate(InPort port) { - return rate; - } - - @Override - public void onPush(InPort port, float demand) { - float capacity = port.getCapacity(); - rate = Math.min(demand, capacity); - } - - @Override - public void onUpstreamFinish(InPort port, Throwable cause) { - rate = 0.f; - } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/EmptyFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/EmptyFlowSource.java deleted file mode 100644 index 2dcc66e4..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/EmptyFlowSource.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow2.source; - -import org.opendc.simulator.flow2.FlowGraph; -import org.opendc.simulator.flow2.FlowStage; -import org.opendc.simulator.flow2.FlowStageLogic; -import org.opendc.simulator.flow2.OutPort; -import org.opendc.simulator.flow2.Outlet; - -/** - * An empty {@link FlowSource}. - */ -public final class EmptyFlowSource implements FlowSource, FlowStageLogic { - private final FlowStage stage; - private final OutPort output; - - /** - * Construct a new {@link EmptyFlowSource}. - */ - public EmptyFlowSource(FlowGraph graph) { - this.stage = graph.newStage(this); - this.output = stage.getOutlet("out"); - } - - /** - * Return the {@link Outlet} of the source. - */ - @Override - public Outlet getOutput() { - return output; - } - - /** - * Remove this node from the graph. - */ - public void close() { - stage.close(); - } - - @Override - public long onUpdate(FlowStage ctx, long now) { - return Long.MAX_VALUE; - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java deleted file mode 100644 index c09987cd..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow2.source; - -import java.util.function.Consumer; -import org.opendc.simulator.flow2.FlowGraph; -import org.opendc.simulator.flow2.FlowStage; -import org.opendc.simulator.flow2.FlowStageLogic; -import org.opendc.simulator.flow2.OutHandler; -import org.opendc.simulator.flow2.OutPort; -import org.opendc.simulator.flow2.Outlet; - -/** - * A {@link FlowSource} that ensures a flow is emitted for a specified amount of time at some utilization. - */ -public class RuntimeFlowSource implements FlowSource, FlowStageLogic { - private final float utilization; - - private final FlowStage stage; - private final OutPort output; - private final Consumer<RuntimeFlowSource> completionHandler; - - private long duration; - private long lastPull; - - /** - * Construct a {@link RuntimeFlowSource} instance. - * - * @param graph The {@link FlowGraph} to which this source belongs. - * @param duration The duration of the source. - * @param utilization The utilization of the capacity of the outlet. - * @param completionHandler A callback invoked when the source completes. - */ - public RuntimeFlowSource( - FlowGraph graph, long duration, float utilization, Consumer<RuntimeFlowSource> completionHandler) { - if (duration <= 0) { - throw new IllegalArgumentException("Duration must be positive and non-zero"); - } - - if (utilization <= 0.0) { - throw new IllegalArgumentException("Utilization must be positive and non-zero"); - } - - this.stage = graph.newStage(this); - this.output = stage.getOutlet("out"); - this.output.setHandler(new OutHandler() { - @Override - public void onPull(OutPort port, float capacity) {} - - @Override - public void onDownstreamFinish(OutPort port, Throwable cause) { - // Source cannot complete without re-connecting to another sink, so mark the source as completed - completionHandler.accept(RuntimeFlowSource.this); - } - }); - this.duration = duration; - this.utilization = utilization; - this.completionHandler = completionHandler; - this.lastPull = graph.getEngine().getClock().millis(); - } - - /** - * Construct a new {@link RuntimeFlowSource}. - * - * @param graph The {@link FlowGraph} to which this source belongs. - * @param duration The duration of the source. - * @param utilization The utilization of the capacity of the outlet. - */ - public RuntimeFlowSource(FlowGraph graph, long duration, float utilization) { - this(graph, duration, utilization, RuntimeFlowSource::close); - } - - /** - * Return the {@link Outlet} of the source. - */ - @Override - public Outlet getOutput() { - return output; - } - - /** - * Remove this node from the graph. - */ - public void close() { - stage.close(); - } - - @Override - public long onUpdate(FlowStage ctx, long now) { - long lastPull = this.lastPull; - this.lastPull = now; - - long delta = Math.max(0, now - lastPull); - - OutPort output = this.output; - float limit = output.getCapacity() * utilization; - long duration = this.duration - delta; - - if (duration <= 0) { - completionHandler.accept(this); - return Long.MAX_VALUE; - } - - this.duration = duration; - output.push(limit); - return now + duration; - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java deleted file mode 100644 index a0e9cb9d..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow2.source; - -import java.util.function.Consumer; -import org.opendc.simulator.flow2.FlowGraph; -import org.opendc.simulator.flow2.FlowStage; -import org.opendc.simulator.flow2.FlowStageLogic; -import org.opendc.simulator.flow2.OutHandler; -import org.opendc.simulator.flow2.OutPort; -import org.opendc.simulator.flow2.Outlet; - -/** - * A flow source that contains a fixed amount and is pushed with a given utilization. - */ -public final class SimpleFlowSource implements FlowSource, FlowStageLogic { - private final float utilization; - private float remainingAmount; - private long lastPull; - - private final FlowStage stage; - private final OutPort output; - private final Consumer<SimpleFlowSource> completionHandler; - - /** - * Construct a new {@link SimpleFlowSource}. - * - * @param graph The {@link FlowGraph} to which this source belongs. - * @param amount The amount to transfer via the outlet. - * @param utilization The utilization of the capacity of the outlet. - * @param completionHandler A callback invoked when the source completes. - */ - public SimpleFlowSource( - FlowGraph graph, float amount, float utilization, Consumer<SimpleFlowSource> completionHandler) { - if (amount < 0.0) { - throw new IllegalArgumentException("Amount must be non-negative"); - } - - if (utilization <= 0.0) { - throw new IllegalArgumentException("Utilization must be positive and non-zero"); - } - - this.stage = graph.newStage(this); - this.output = stage.getOutlet("out"); - this.output.setHandler(new OutHandler() { - @Override - public void onPull(OutPort port, float capacity) {} - - @Override - public void onDownstreamFinish(OutPort port, Throwable cause) { - // Source cannot complete without re-connecting to another sink, so mark the source as completed - completionHandler.accept(SimpleFlowSource.this); - } - }); - this.completionHandler = completionHandler; - this.utilization = utilization; - this.remainingAmount = amount; - this.lastPull = graph.getEngine().getClock().millis(); - } - - /** - * Construct a new {@link SimpleFlowSource}. - * - * @param graph The {@link FlowGraph} to which this source belongs. - * @param amount The amount to transfer via the outlet. - * @param utilization The utilization of the capacity of the outlet. - */ - public SimpleFlowSource(FlowGraph graph, float amount, float utilization) { - this(graph, amount, utilization, SimpleFlowSource::close); - } - - /** - * Return the {@link Outlet} of the source. - */ - @Override - public Outlet getOutput() { - return output; - } - - /** - * Remove this node from the graph. - */ - public void close() { - stage.close(); - } - - @Override - public long onUpdate(FlowStage ctx, long now) { - long lastPull = this.lastPull; - this.lastPull = now; - - long delta = Math.max(0, now - lastPull); - - OutPort output = this.output; - float consumed = output.getRate() * delta / 1000.f; - float limit = output.getCapacity() * utilization; - - float remainingAmount = this.remainingAmount - consumed; - this.remainingAmount = remainingAmount; - - long duration = (long) Math.ceil(remainingAmount / limit * 1000); - - if (duration <= 0) { - completionHandler.accept(this); - return Long.MAX_VALUE; - } - - output.push(limit); - return now + duration; - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java deleted file mode 100644 index e8abc2d7..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow2.source; - -import java.util.function.Consumer; -import org.opendc.simulator.flow2.FlowGraph; -import org.opendc.simulator.flow2.FlowStage; -import org.opendc.simulator.flow2.FlowStageLogic; -import org.opendc.simulator.flow2.OutHandler; -import org.opendc.simulator.flow2.OutPort; -import org.opendc.simulator.flow2.Outlet; - -/** - * A flow source that replays a sequence of fragments, each indicating the flow rate for some period of time. - */ -public final class TraceFlowSource implements FlowSource, FlowStageLogic { - private final OutPort output; - private final long[] deadlines; - private final float[] usages; - private final int size; - private int index; - - private final FlowStage stage; - private final Consumer<TraceFlowSource> completionHandler; - - /** - * Construct a {@link TraceFlowSource}. - * - * @param graph The {@link FlowGraph} to which the source belongs. - * @param trace The {@link Trace} to replay. - * @param completionHandler The completion handler to invoke when the source finishes. - */ - public TraceFlowSource(FlowGraph graph, Trace trace, Consumer<TraceFlowSource> completionHandler) { - this.stage = graph.newStage(this); - this.output = stage.getOutlet("out"); - this.output.setHandler(new OutHandler() { - @Override - public void onPull(OutPort port, float capacity) {} - - @Override - public void onDownstreamFinish(OutPort port, Throwable cause) { - // Source cannot complete without re-connecting to another sink, so mark the source as completed - completionHandler.accept(TraceFlowSource.this); - } - }); - this.deadlines = trace.deadlines; - this.usages = trace.usages; - this.size = trace.size; - this.completionHandler = completionHandler; - } - - /** - * Construct a {@link TraceFlowSource}. - * - * @param graph The {@link FlowGraph} to which the source belongs. - * @param trace The {@link Trace} to replay. - */ - public TraceFlowSource(FlowGraph graph, Trace trace) { - this(graph, trace, TraceFlowSource::close); - } - - @Override - public Outlet getOutput() { - return output; - } - - /** - * Remove this node from the graph. - */ - public void close() { - stage.close(); - } - - @Override - public long onUpdate(FlowStage ctx, long now) { - int size = this.size; - int index = this.index; - long[] deadlines = this.deadlines; - long deadline; - - do { - deadline = deadlines[index]; - } while (deadline <= now && ++index < size); - - if (index >= size) { - output.push(0.0f); - completionHandler.accept(this); - return Long.MAX_VALUE; - } - - this.index = index; - float usage = usages[index]; - output.push(usage); - - return deadline; - } - - /** - * A trace describes the workload over time. - */ - public static final class Trace { - private final long[] deadlines; - private final float[] usages; - private final int size; - - /** - * Construct a {@link Trace}. - * - * @param deadlines The deadlines of the trace fragments. - * @param usages The usages of the trace fragments. - * @param size The size of the trace. - */ - public Trace(long[] deadlines, float[] usages, int size) { - this.deadlines = deadlines; - this.usages = usages; - this.size = size; - } - - public long[] getDeadlines() { - return deadlines; - } - - public float[] getUsages() { - return usages; - } - - public int getSize() { - return size; - } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransform.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransform.java deleted file mode 100644 index 51ea7df3..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransform.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow2.util; - -import org.opendc.simulator.flow2.FlowGraph; - -/** - * A {@link FlowTransform} describes a transformation between two components in a {@link FlowGraph} that might operate - * at different units of flow. - */ -public interface FlowTransform { - /** - * Apply the transform to the specified flow rate. - */ - float apply(float value); - - /** - * Apply the inverse of the transformation to the specified flow rate. - */ - float applyInverse(float value); -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransformer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransformer.java deleted file mode 100644 index 852240d8..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransformer.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow2.util; - -import org.opendc.simulator.flow2.*; -import org.opendc.simulator.flow2.sink.FlowSink; -import org.opendc.simulator.flow2.source.FlowSource; - -/** - * Helper class to transform flow from outlet to inlet. - */ -public final class FlowTransformer implements FlowStageLogic, FlowSource, FlowSink { - private final FlowStage stage; - private final InPort input; - private final OutPort output; - - /** - * Construct a new {@link FlowTransformer}. - */ - public FlowTransformer(FlowGraph graph, FlowTransform transform) { - this.stage = graph.newStage(this); - this.input = stage.getInlet("in"); - this.output = stage.getOutlet("out"); - - this.input.setHandler(new ForwardInHandler(output, transform)); - this.input.setMask(true); - this.output.setHandler(new ForwardOutHandler(input, transform)); - this.output.setMask(true); - } - - /** - * Return the {@link Outlet} of the transformer. - */ - @Override - public Outlet getOutput() { - return output; - } - - /** - * Return the {@link Inlet} of the transformer. - */ - @Override - public Inlet getInput() { - return input; - } - - /** - * Close the transformer. - */ - void close() { - stage.close(); - } - - @Override - public long onUpdate(FlowStage ctx, long now) { - return Long.MAX_VALUE; - } - - private static class ForwardInHandler implements InHandler { - private final OutPort output; - private final FlowTransform transform; - - ForwardInHandler(OutPort output, FlowTransform transform) { - this.output = output; - this.transform = transform; - } - - @Override - public float getRate(InPort port) { - return transform.applyInverse(output.getRate()); - } - - @Override - public void onPush(InPort port, float demand) { - float rate = transform.apply(demand); - output.push(rate); - } - - @Override - public void onUpstreamFinish(InPort port, Throwable cause) { - output.fail(cause); - } - } - - private static class ForwardOutHandler implements OutHandler { - private final InPort input; - private final FlowTransform transform; - - ForwardOutHandler(InPort input, FlowTransform transform) { - this.input = input; - this.transform = transform; - } - - @Override - public void onPull(OutPort port, float capacity) { - input.pull(transform.applyInverse(capacity)); - } - - @Override - public void onDownstreamFinish(OutPort port, Throwable cause) { - input.cancel(cause); - } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransforms.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransforms.java deleted file mode 100644 index 428dbfca..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransforms.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow2.util; - -/** - * A collection of common {@link FlowTransform} implementations. - */ -public class FlowTransforms { - /** - * Prevent construction of this class. - */ - private FlowTransforms() {} - - /** - * Return a {@link FlowTransform} that forwards the flow rate unmodified. - */ - public static FlowTransform noop() { - return NoopFlowTransform.INSTANCE; - } - - /** - * No-op implementation of a {@link FlowTransform}. - */ - private static final class NoopFlowTransform implements FlowTransform { - static final NoopFlowTransform INSTANCE = new NoopFlowTransform(); - - @Override - public float apply(float value) { - return value; - } - - @Override - public float applyInverse(float value) { - return value; - } - } -} |
