From ad88144923d76dfc421f0b22a0b4e670b3f6366e Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 21 Aug 2022 14:27:41 +0200 Subject: perf(sim/flow): Add support for multi-flow stages This change adds support for creating nodes in a flow graph that support multiple inputs and outputs directly, instead of our current approach where we need to re-implement the `FlowConsumerContext` interface in order to support multiple inputs or outputs. --- .../org/opendc/simulator/flow2/FlowEngine.java | 260 ++++++++++++++++++ .../java/org/opendc/simulator/flow2/FlowGraph.java | 63 +++++ .../opendc/simulator/flow2/FlowGraphInternal.java | 93 +++++++ .../java/org/opendc/simulator/flow2/FlowStage.java | 303 +++++++++++++++++++++ .../org/opendc/simulator/flow2/FlowStageLogic.java | 38 +++ .../org/opendc/simulator/flow2/FlowStageQueue.java | 109 ++++++++ .../org/opendc/simulator/flow2/FlowTimerQueue.java | 208 ++++++++++++++ .../java/org/opendc/simulator/flow2/InHandler.java | 54 ++++ .../org/opendc/simulator/flow2/InHandlers.java | 53 ++++ .../java/org/opendc/simulator/flow2/InPort.java | 214 +++++++++++++++ .../java/org/opendc/simulator/flow2/Inlet.java | 38 +++ .../opendc/simulator/flow2/InvocationStack.java | 95 +++++++ .../org/opendc/simulator/flow2/OutHandler.java | 47 ++++ .../org/opendc/simulator/flow2/OutHandlers.java | 53 ++++ .../java/org/opendc/simulator/flow2/OutPort.java | 224 +++++++++++++++ .../java/org/opendc/simulator/flow2/Outlet.java | 38 +++ .../simulator/flow2/mux/FlowMultiplexer.java | 70 +++++ .../simulator/flow2/mux/MaxMinFlowMultiplexer.java | 268 ++++++++++++++++++ .../org/opendc/simulator/flow2/sink/FlowSink.java | 36 +++ .../simulator/flow2/sink/SimpleFlowSink.java | 123 +++++++++ .../simulator/flow2/source/EmptyFlowSource.java | 65 +++++ .../opendc/simulator/flow2/source/FlowSource.java | 36 +++ .../simulator/flow2/source/RuntimeFlowSource.java | 129 +++++++++ .../simulator/flow2/source/SimpleFlowSource.java | 132 +++++++++ .../simulator/flow2/source/TraceFlowSource.java | 152 +++++++++++ 25 files changed, 2901 insertions(+) create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraph.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraphInternal.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageLogic.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageQueue.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowTimerQueue.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandler.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandlers.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Inlet.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InvocationStack.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandler.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandlers.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Outlet.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/FlowSink.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/SimpleFlowSink.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/EmptyFlowSource.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/FlowSource.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java (limited to 'opendc-simulator/opendc-simulator-flow/src/main/java/org') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java new file mode 100644 index 00000000..0ebb0da9 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java @@ -0,0 +1,260 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +import java.time.Clock; +import java.util.ArrayList; +import java.util.List; +import kotlin.coroutines.ContinuationInterceptor; +import kotlin.coroutines.CoroutineContext; +import kotlinx.coroutines.Delay; + +/** + * A {@link FlowEngine} simulates a generic flow network. + *

+ * The engine centralizes the scheduling logic of state updates of flow connections, allowing update propagation + * to happen more efficiently. and overall, reducing the work necessary to transition into a steady state. + */ +public final class FlowEngine implements Runnable { + /** + * The queue of {@link FlowStage} updates that are scheduled for immediate execution. + */ + private final FlowStageQueue queue = new FlowStageQueue(256); + + /** + * A priority queue containing the {@link FlowStage} updates to be scheduled in the future. + */ + private final FlowTimerQueue timerQueue = new FlowTimerQueue(256); + + /** + * The stack of engine invocations to occur in the future. + */ + private final InvocationStack futureInvocations = new InvocationStack(256); + + /** + * A flag to indicate that the engine is active. + */ + private boolean active; + + private final CoroutineContext coroutineContext; + private final Clock clock; + private final Delay delay; + + /** + * Create a new {@link FlowEngine} instance using the specified {@link CoroutineContext} and {@link Clock}. + */ + public static FlowEngine create(CoroutineContext coroutineContext, Clock clock) { + return new FlowEngine(coroutineContext, clock); + } + + FlowEngine(CoroutineContext coroutineContext, Clock clock) { + this.coroutineContext = coroutineContext; + this.clock = clock; + + CoroutineContext.Key key = ContinuationInterceptor.Key; + this.delay = (Delay) coroutineContext.get(key); + } + + /** + * Obtain the (virtual) {@link Clock} driving the simulation. + */ + public Clock getClock() { + return clock; + } + + /** + * Return a new {@link FlowGraph} that can be used to build a flow network. + */ + public FlowGraph newGraph() { + return new RootGraph(this); + } + + /** + * Enqueue the specified {@link FlowStage} to be updated immediately during the active engine cycle. + *

+ * This method should be used when the state of a flow context is invalidated/interrupted and needs to be + * re-computed. + */ + void scheduleImmediate(long now, FlowStage ctx) { + scheduleImmediateInContext(ctx); + + // In-case the engine is already running in the call-stack, return immediately. The changes will be picked + // up by the active engine. + if (active) { + return; + } + + trySchedule(futureInvocations, now, now); + } + + /** + * Enqueue the specified {@link FlowStage} to be updated immediately during the active engine cycle. + *

+ * This method should be used when the state of a flow context is invalidated/interrupted and needs to be + * re-computed. + *

+ * This method should only be invoked while inside an engine cycle. + */ + void scheduleImmediateInContext(FlowStage ctx) { + queue.add(ctx); + } + + /** + * Enqueue the specified {@link FlowStage} to be updated at its updated deadline. + */ + void scheduleDelayed(FlowStage ctx) { + scheduleDelayedInContext(ctx); + + // In-case the engine is already running in the call-stack, return immediately. The changes will be picked + // up by the active engine. + if (active) { + return; + } + + long deadline = timerQueue.peekDeadline(); + if (deadline != Long.MAX_VALUE) { + trySchedule(futureInvocations, clock.millis(), deadline); + } + } + + /** + * Enqueue the specified {@link FlowStage} to be updated at its updated deadline. + *

+ * This method should only be invoked while inside an engine cycle. + */ + void scheduleDelayedInContext(FlowStage ctx) { + FlowTimerQueue timerQueue = this.timerQueue; + timerQueue.enqueue(ctx); + } + + /** + * Run all the enqueued actions for the specified timestamp (now). + */ + private void doRunEngine(long now) { + final FlowStageQueue queue = this.queue; + final FlowTimerQueue timerQueue = this.timerQueue; + + try { + // Mark the engine as active to prevent concurrent calls to this method + active = true; + + // Execute all scheduled updates at current timestamp + while (true) { + final FlowStage ctx = timerQueue.poll(now); + if (ctx == null) { + break; + } + + ctx.onUpdate(now); + } + + // Execute all immediate updates + while (true) { + final FlowStage ctx = queue.poll(); + if (ctx == null) { + break; + } + + ctx.onUpdate(now); + } + } finally { + active = false; + } + + // Schedule an engine invocation for the next update to occur. + long headDeadline = timerQueue.peekDeadline(); + if (headDeadline != Long.MAX_VALUE && headDeadline >= now) { + trySchedule(futureInvocations, now, headDeadline); + } + } + + @Override + public void run() { + doRunEngine(futureInvocations.poll()); + } + + /** + * Try to schedule an engine invocation at the specified [target]. + * + * @param scheduled The queue of scheduled invocations. + * @param now The current virtual timestamp. + * @param target The virtual timestamp at which the engine invocation should happen. + */ + private void trySchedule(InvocationStack scheduled, long now, long target) { + // Only schedule a new scheduler invocation in case the target is earlier than all other pending + // scheduler invocations + if (scheduled.tryAdd(target)) { + delay.invokeOnTimeout(target - now, this, coroutineContext); + } + } + + /** + * Internal implementation of a root {@link FlowGraph}. + */ + private static final class RootGraph implements FlowGraphInternal { + private final FlowEngine engine; + private final List stages = new ArrayList<>(); + + public RootGraph(FlowEngine engine) { + this.engine = engine; + } + + @Override + public FlowEngine getEngine() { + return engine; + } + + @Override + public FlowStage newStage(FlowStageLogic logic) { + final FlowEngine engine = this.engine; + final FlowStage stage = new FlowStage(this, logic); + stages.add(stage); + long now = engine.getClock().millis(); + stage.invalidate(now); + return stage; + } + + @Override + public void connect(Outlet outlet, Inlet inlet) { + FlowGraphInternal.connect(this, outlet, inlet); + } + + @Override + public void disconnect(Outlet outlet) { + FlowGraphInternal.disconnect(this, outlet); + } + + @Override + public void disconnect(Inlet inlet) { + FlowGraphInternal.disconnect(this, inlet); + } + + /** + * Internal method to remove the specified {@link FlowStage} from the graph. + */ + @Override + public void detach(FlowStage stage) { + stages.remove(stage); + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraph.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraph.java new file mode 100644 index 00000000..f45be6cd --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraph.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +/** + * A representation of a flow network. A flow network is a directed graph where each edge has a capacity and receives an + * amount of flow that cannot exceed the edge's capacity. + */ +public interface FlowGraph { + /** + * Return the {@link FlowEngine} driving the simulation of the graph. + */ + FlowEngine getEngine(); + + /** + * Create a new {@link FlowStage} representing a node in the flow network. + * + * @param logic The logic for handling the events of the stage. + */ + FlowStage newStage(FlowStageLogic logic); + + /** + * Add an edge between the specified outlet port and inlet port in this graph. + * + * @param outlet The outlet of the source from which the flow originates. + * @param inlet The inlet of the sink that should receive the flow. + */ + void connect(Outlet outlet, Inlet inlet); + + /** + * Disconnect the specified {@link Outlet} (if connected). + * + * @param outlet The outlet to disconnect. + */ + void disconnect(Outlet outlet); + + /** + * Disconnect the specified {@link Inlet} (if connected). + * + * @param inlet The inlet to disconnect. + */ + void disconnect(Inlet inlet); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraphInternal.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraphInternal.java new file mode 100644 index 00000000..0f608b60 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraphInternal.java @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +/** + * Interface implemented by {@link FlowGraph} implementations. + */ +interface FlowGraphInternal extends FlowGraph { + /** + * Internal method to remove the specified {@link FlowStage} from the graph. + */ + void detach(FlowStage stage); + + /** + * Helper method to connect an outlet to an inlet. + */ + static void connect(FlowGraph graph, Outlet outlet, Inlet inlet) { + if (!(outlet instanceof OutPort) || !(inlet instanceof InPort)) { + throw new IllegalArgumentException("Invalid outlet or inlet passed to graph"); + } + + InPort inPort = (InPort) inlet; + OutPort outPort = (OutPort) outlet; + + if (!graph.equals(outPort.getGraph()) || !graph.equals(inPort.getGraph())) { + throw new IllegalArgumentException("Outlet or inlet does not belong to graph"); + } else if (outPort.input != null || inPort.output != null) { + throw new IllegalStateException("Inlet or outlet already connected"); + } + + outPort.input = inPort; + inPort.output = outPort; + + inPort.connect(); + outPort.connect(); + } + + /** + * Helper method to disconnect an outlet. + */ + static void disconnect(FlowGraph graph, Outlet outlet) { + if (!(outlet instanceof OutPort)) { + throw new IllegalArgumentException("Invalid outlet passed to graph"); + } + + OutPort outPort = (OutPort) outlet; + + if (!graph.equals(outPort.getGraph())) { + throw new IllegalArgumentException("Outlet or inlet does not belong to graph"); + } + + outPort.cancel(null); + outPort.complete(); + } + + /** + * Helper method to disconnect an inlet. + */ + static void disconnect(FlowGraph graph, Inlet inlet) { + if (!(inlet instanceof InPort)) { + throw new IllegalArgumentException("Invalid outlet passed to graph"); + } + + InPort inPort = (InPort) inlet; + + if (!graph.equals(inPort.getGraph())) { + throw new IllegalArgumentException("Outlet or inlet does not belong to graph"); + } + + inPort.finish(null); + inPort.cancel(null); + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java new file mode 100644 index 00000000..4d098043 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java @@ -0,0 +1,303 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +import java.time.Clock; +import java.util.HashMap; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link FlowStage} represents a node in a {@link FlowGraph}. + */ +public final class FlowStage { + private static final Logger LOGGER = LoggerFactory.getLogger(FlowStage.class); + + /** + * States of the flow stage. + */ + private static final int STAGE_PENDING = 0; // Stage is pending to be started + + private static final int STAGE_ACTIVE = 1; // Stage is actively running + private static final int STAGE_CLOSED = 2; // Stage is closed + private static final int STAGE_STATE = 0b11; // Mask for accessing the state of the flow stage + + /** + * Flags of the flow connection + */ + private static final int STAGE_INVALIDATE = 1 << 2; // The stage is invalidated + + private static final int STAGE_CLOSE = 1 << 3; // The stage should be closed + private static final int STAGE_UPDATE_ACTIVE = 1 << 4; // An update for the connection is active + private static final int STAGE_UPDATE_PENDING = 1 << 5; // An (immediate) update of the connection is pending + + /** + * The flags representing the state and pending actions for the stage. + */ + private int flags = STAGE_PENDING; + + /** + * The deadline of the stage after which an update should run. + */ + long deadline = Long.MAX_VALUE; + + /** + * The index of the timer in the {@link FlowTimerQueue}. + */ + int timerIndex = -1; + + final Clock clock; + private final FlowStageLogic logic; + final FlowGraphInternal parentGraph; + private final FlowEngine engine; + + private final Map inlets = new HashMap<>(); + private final Map outlets = new HashMap<>(); + private int nextInlet = 0; + private int nextOutlet = 0; + + /** + * Construct a new {@link FlowStage} instance. + * + * @param parentGraph The {@link FlowGraph} this stage belongs to. + * @param logic The logic of the stage. + */ + FlowStage(FlowGraphInternal parentGraph, FlowStageLogic logic) { + this.parentGraph = parentGraph; + this.logic = logic; + this.engine = parentGraph.getEngine(); + this.clock = engine.getClock(); + } + + /** + * Return the {@link FlowGraph} to which this stage belongs. + */ + public FlowGraph getGraph() { + return parentGraph; + } + + /** + * Return the {@link Inlet} (an in-going edge) with the specified name for this {@link FlowStage}. + * If an inlet with that name does not exist, a new one is allocated for the stage. + * + * @param name The name of the inlet. + * @return The {@link InPort} representing an {@link Inlet} with the specified name. + */ + public InPort getInlet(String name) { + return inlets.computeIfAbsent(name, (key) -> new InPort(this, key, nextInlet++)); + } + + /** + * Return the {@link Outlet} (an out-going edge) with the specified name for this {@link FlowStage}. + * If an outlet with that name does not exist, a new one is allocated for the stage. + * + * @param name The name of the outlet. + * @return The {@link OutPort} representing an {@link Outlet} with the specified name. + */ + public OutPort getOutlet(String name) { + return outlets.computeIfAbsent(name, (key) -> new OutPort(this, key, nextOutlet++)); + } + + /** + * Return the current deadline of the {@link FlowStage}'s timer (in milliseconds after epoch). + */ + public long getDeadline() { + return deadline; + } + + /** + * Set the deadline of the {@link FlowStage}'s timer. + * + * @param deadline The new deadline (in milliseconds after epoch) when the stage should be interrupted. + */ + public void setDeadline(long deadline) { + this.deadline = deadline; + + if ((flags & STAGE_UPDATE_ACTIVE) == 0) { + // Update the timer queue with the new deadline + engine.scheduleDelayed(this); + } + } + + /** + * Invalidate the {@link FlowStage} forcing the stage to update. + */ + public void invalidate() { + int flags = this.flags; + + if ((flags & STAGE_UPDATE_ACTIVE) == 0) { + scheduleImmediate(clock.millis(), flags | STAGE_INVALIDATE); + } + } + + /** + * Close the {@link FlowStage} and disconnect all inlets and outlets. + */ + public void close() { + int flags = this.flags; + + if ((flags & STAGE_STATE) == STAGE_CLOSED) { + return; + } + + // Toggle the close bit. In case no update is active, schedule a new update. + if ((flags & STAGE_UPDATE_ACTIVE) != 0) { + this.flags = flags | STAGE_CLOSE; + } else { + scheduleImmediate(clock.millis(), flags | STAGE_CLOSE); + } + } + + /** + * Update the state of the flow stage. + * + * @param now The current virtual timestamp. + */ + void onUpdate(long now) { + int flags = this.flags; + int state = flags & STAGE_STATE; + + if (state == STAGE_ACTIVE) { + doUpdate(now, flags); + } else if (state == STAGE_PENDING) { + doStart(now, flags); + } + } + + /** + * Invalidate the {@link FlowStage} forcing the stage to update. + * + *

+ * This method is similar to {@link #invalidate()}, but allows the user to manually pass the current timestamp to + * prevent having to re-query the clock. This method should not be called during an update. + */ + void invalidate(long now) { + scheduleImmediate(now, flags | STAGE_INVALIDATE); + } + + /** + * Schedule an immediate update for this stage. + */ + private void scheduleImmediate(long now, int flags) { + // In case an immediate update is already scheduled, no need to do anything + if ((flags & STAGE_UPDATE_PENDING) != 0) { + this.flags = flags; + return; + } + + // Mark the stage that there is an update pending + this.flags = flags | STAGE_UPDATE_PENDING; + + engine.scheduleImmediate(now, this); + } + + /** + * Start the stage. + */ + private void doStart(long now, int flags) { + // Update state before calling into the outside world, so it observes a consistent state + flags = flags | STAGE_ACTIVE | STAGE_UPDATE_ACTIVE; + + doUpdate(now, flags); + } + + /** + * Update the state of the stage. + */ + private void doUpdate(long now, int flags) { + long deadline = this.deadline; + long newDeadline = deadline; + + // Update the stage if: + // (1) the timer of the stage has expired. + // (2) one of the input ports is pushed, + // (3) one of the output ports is pulled, + if ((flags & STAGE_INVALIDATE) != 0 || deadline == now) { + // Update state before calling into the outside world, so it observes a consistent state + this.flags = (flags & ~STAGE_INVALIDATE) | STAGE_UPDATE_ACTIVE; + + try { + newDeadline = logic.onUpdate(this, now); + + // IMPORTANT: Re-fetch the flags after the callback might have changed those + flags = this.flags; + } catch (Exception e) { + doFail(e); + } + } + + // Check whether the stage is marked as closing. + if ((flags & STAGE_CLOSE) != 0) { + doClose(flags, null); + + // IMPORTANT: Re-fetch the flags after the callback might have changed those + flags = this.flags; + } + + // Indicate that no update is active anymore and flush the flags + this.flags = flags & ~(STAGE_UPDATE_ACTIVE | STAGE_UPDATE_PENDING); + this.deadline = newDeadline; + + // Update the timer queue with the new deadline + engine.scheduleDelayedInContext(this); + } + + /** + * This method is invoked when an uncaught exception is caught by the engine. When this happens, the + * {@link FlowStageLogic} "fails" and disconnects all its inputs and outputs. + */ + void doFail(Throwable cause) { + LOGGER.warn("Uncaught exception (closing stage)", cause); + + doClose(flags, cause); + } + + /** + * This method is invoked when the {@link FlowStageLogic} exits successfully or due to failure. + */ + private void doClose(int flags, Throwable cause) { + // Mark the stage as closed + this.flags = flags & ~(STAGE_STATE | STAGE_INVALIDATE | STAGE_CLOSE) | STAGE_CLOSED; + + // Remove stage from parent graph + parentGraph.detach(this); + + // Remove stage from the timer queue + setDeadline(Long.MAX_VALUE); + + // Cancel all input ports + for (InPort port : inlets.values()) { + if (port != null) { + port.cancel(cause); + } + } + + // Cancel all output ports + for (OutPort port : outlets.values()) { + if (port != null) { + port.fail(cause); + } + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageLogic.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageLogic.java new file mode 100644 index 00000000..70986a35 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageLogic.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +/** + * The {@link FlowStageLogic} interface is responsible for describing the behaviour of a {@link FlowStage} via + * out-going flows based on its potential inputs. + */ +public interface FlowStageLogic { + /** + * This method is invoked when the one of the stage's inlets or outlets is invalidated. + * + * @param ctx The context in which the stage runs. + * @param now The virtual timestamp in milliseconds after epoch at which the update is occurring. + * @return The next deadline for the stage. + */ + long onUpdate(FlowStage ctx, long now); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageQueue.java new file mode 100644 index 00000000..56ec7702 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageQueue.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +import java.util.ArrayDeque; +import java.util.Arrays; + +/** + * A specialized {@link ArrayDeque} implementation that contains the {@link FlowStageLogic}s + * that have been updated during the engine cycle and should converge. + *

+ * By using a specialized class, we reduce the overhead caused by type-erasure. + */ +final class FlowStageQueue { + /** + * The array of elements in the queue. + */ + private FlowStage[] elements; + + private int head = 0; + private int tail = 0; + + public FlowStageQueue(int initialCapacity) { + elements = new FlowStage[initialCapacity]; + } + + /** + * Add the specified context to the queue. + */ + void add(FlowStage ctx) { + final FlowStage[] es = elements; + int tail = this.tail; + + es[tail] = ctx; + + tail = inc(tail, es.length); + this.tail = tail; + + if (head == tail) { + doubleCapacity(); + } + } + + /** + * Remove a {@link FlowStage} from the queue or null if the queue is empty. + */ + FlowStage poll() { + final FlowStage[] es = elements; + int head = this.head; + FlowStage ctx = es[head]; + + if (ctx != null) { + es[head] = null; + this.head = inc(head, es.length); + } + + return ctx; + } + + /** + * Doubles the capacity of this deque + */ + private void doubleCapacity() { + int oldCapacity = elements.length; + int newCapacity = oldCapacity + (oldCapacity >> 1); + if (newCapacity < 0) { + throw new IllegalStateException("Sorry, deque too big"); + } + + final FlowStage[] es = elements = Arrays.copyOf(elements, newCapacity); + + // Exceptionally, here tail == head needs to be disambiguated + if (tail < head || (tail == head && es[head] != null)) { + // wrap around; slide first leg forward to end of array + int newSpace = newCapacity - oldCapacity; + System.arraycopy(es, head, es, head + newSpace, oldCapacity - head); + for (int i = head, to = (head += newSpace); i < to; i++) es[i] = null; + } + } + + /** + * Circularly increments i, mod modulus. + * Precondition and postcondition: 0 <= i < modulus. + */ + private static int inc(int i, int modulus) { + if (++i >= modulus) i = 0; + return i; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowTimerQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowTimerQueue.java new file mode 100644 index 00000000..4b746202 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowTimerQueue.java @@ -0,0 +1,208 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +import java.util.Arrays; + +/** + * A specialized priority queue for timers of {@link FlowStageLogic}s. + *

+ * By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation + * being generic. + */ +final class FlowTimerQueue { + /** + * Array representation of binary heap of {@link FlowStage} instances. + */ + private FlowStage[] queue; + + /** + * The number of elements in the priority queue. + */ + private int size = 0; + + /** + * Construct a {@link FlowTimerQueue} with the specified initial capacity. + * + * @param initialCapacity The initial capacity of the queue. + */ + public FlowTimerQueue(int initialCapacity) { + this.queue = new FlowStage[initialCapacity]; + } + + /** + * Enqueue a timer for the specified context or update the existing timer. + */ + void enqueue(FlowStage ctx) { + FlowStage[] es = queue; + int k = ctx.timerIndex; + + if (ctx.deadline != Long.MAX_VALUE) { + if (k >= 0) { + update(es, ctx, k); + } else { + add(es, ctx); + } + } else if (k >= 0) { + delete(es, k); + } + } + + /** + * Retrieve the head of the queue if its deadline does not exceed now. + * + * @param now The timestamp that the deadline of the head of the queue should not exceed. + * @return The head of the queue if its deadline does not exceed now, otherwise null. + */ + FlowStage poll(long now) { + int size = this.size; + if (size == 0) { + return null; + } + + final FlowStage[] es = queue; + final FlowStage head = es[0]; + + if (now < head.deadline) { + return null; + } + + int n = size - 1; + this.size = n; + final FlowStage next = es[n]; + es[n] = null; // Clear the last element of the queue + + if (n > 0) { + siftDown(0, next, es, n); + } + + head.timerIndex = -1; + return head; + } + + /** + * Find the earliest deadline in the queue. + */ + long peekDeadline() { + if (size > 0) { + return queue[0].deadline; + } + + return Long.MAX_VALUE; + } + + /** + * Add a new entry to the queue. + */ + private void add(FlowStage[] es, FlowStage ctx) { + int i = size; + + if (i >= es.length) { + // Re-fetch the resized array + es = grow(); + } + + siftUp(i, ctx, es); + + size = i + 1; + } + + /** + * Update the deadline of an existing entry in the queue. + */ + private void update(FlowStage[] es, FlowStage ctx, int k) { + if (k > 0) { + int parent = (k - 1) >>> 1; + if (es[parent].deadline > ctx.deadline) { + siftUp(k, ctx, es); + return; + } + } + + siftDown(k, ctx, es, size); + } + + /** + * Deadline an entry from the queue. + */ + private void delete(FlowStage[] es, int k) { + int s = --size; + if (s == k) { + es[k] = null; // Element is last in the queue + } else { + FlowStage moved = es[s]; + es[s] = null; + + siftDown(k, moved, es, s); + + if (es[k] == moved) { + siftUp(k, moved, es); + } + } + } + + /** + * Increases the capacity of the array. + */ + private FlowStage[] grow() { + FlowStage[] queue = this.queue; + int oldCapacity = queue.length; + int newCapacity = oldCapacity + (oldCapacity >> 1); + + queue = Arrays.copyOf(queue, newCapacity); + this.queue = queue; + return queue; + } + + private static void siftUp(int k, FlowStage key, FlowStage[] es) { + while (k > 0) { + int parent = (k - 1) >>> 1; + FlowStage e = es[parent]; + if (key.deadline >= e.deadline) break; + es[k] = e; + e.timerIndex = k; + k = parent; + } + es[k] = key; + key.timerIndex = k; + } + + private static void siftDown(int k, FlowStage key, FlowStage[] es, int n) { + int half = n >>> 1; // loop while a non-leaf + while (k < half) { + int child = (k << 1) + 1; // assume left child is least + FlowStage c = es[child]; + int right = child + 1; + if (right < n && c.deadline > es[right].deadline) c = es[child = right]; + + if (key.deadline <= c.deadline) break; + + es[k] = c; + c.timerIndex = k; + k = child; + } + + es[k] = key; + key.timerIndex = k; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandler.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandler.java new file mode 100644 index 00000000..839b01db --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandler.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +/** + * Collection of callbacks for the input port (a {@link InPort}) of a {@link FlowStageLogic}. + */ +public interface InHandler { + /** + * Return the actual flow rate over the input port. + * + * @param port The input port to which the flow was pushed. + * @return The actual flow rate over the port. + */ + default float getRate(InPort port) { + return Math.min(port.getDemand(), port.getCapacity()); + } + + /** + * This method is invoked when another {@link FlowStageLogic} changes the rate of flow to the specified inlet. + * + * @param port The input port to which the flow was pushed. + * @param demand The rate of flow the output attempted to push to the port. + */ + void onPush(InPort port, float demand); + + /** + * This method is invoked when the input port is finished. + * + * @param port The input port that has finished. + * @param cause The cause of the input port being finished or null if the port completed successfully. + */ + void onUpstreamFinish(InPort port, Throwable cause); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandlers.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandlers.java new file mode 100644 index 00000000..9d5b4bef --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandlers.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +/** + * A collection of common {@link InHandler} implementations. + */ +public class InHandlers { + /** + * Prevent construction of this class. + */ + private InHandlers() {} + + /** + * Return an {@link InHandler} that does nothing. + */ + public static InHandler noop() { + return NoopInHandler.INSTANCE; + } + + /** + * No-op implementation of {@link InHandler}. + */ + private static final class NoopInHandler implements InHandler { + public static final InHandler INSTANCE = new NoopInHandler(); + + @Override + public void onPush(InPort port, float demand) {} + + @Override + public void onUpstreamFinish(InPort port, Throwable cause) {} + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java new file mode 100644 index 00000000..fba12aaf --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java @@ -0,0 +1,214 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +import java.time.Clock; +import java.util.Objects; + +/** + * A port that consumes a flow. + *

+ * Input ports are represented as in-going edges in the flow graph. + */ +public final class InPort implements Inlet { + private final int id; + + private float capacity; + private float demand; + + private boolean mask; + + OutPort output; + private InHandler handler = InHandlers.noop(); + private final Clock clock; + private final String name; + private final FlowStage stage; + + InPort(FlowStage stage, String name, int id) { + this.name = name; + this.id = id; + this.stage = stage; + this.clock = stage.clock; + } + + @Override + public FlowGraph getGraph() { + return stage.parentGraph; + } + + @Override + public String getName() { + return name; + } + + /** + * Return the identifier of the {@link InPort} with respect to its stage. + */ + public int getId() { + return id; + } + + /** + * Return the current capacity of the input port. + */ + public float getCapacity() { + return capacity; + } + + /** + * Return the current demand of flow of the input port. + */ + public float getDemand() { + return demand; + } + + /** + * Return the current rate of flow of the input port. + */ + public float getRate() { + return handler.getRate(this); + } + + /** + * Pull the flow with the specified capacity from the input port. + * + * @param capacity The maximum throughput that the stage can receive from the input port. + */ + public void pull(float capacity) { + this.capacity = capacity; + + OutPort output = this.output; + if (output != null) { + output.pull(capacity); + } + } + + /** + * Return the current {@link InHandler} of the input port. + */ + public InHandler getHandler() { + return handler; + } + + /** + * Set the {@link InHandler} of the input port. + */ + public void setHandler(InHandler handler) { + this.handler = handler; + } + + /** + * Return the mask of this port. + *

+ * Stages ignore events originating from masked ports. + */ + public boolean getMask() { + return mask; + } + + /** + * (Un)mask the port. + */ + public void setMask(boolean mask) { + this.mask = mask; + } + + /** + * Disconnect the input port from its (potentially) connected outlet. + *

+ * The inlet can still be used and re-connected to another outlet. + * + * @param cause The cause for disconnecting the port or null when no more flow is needed. + */ + public void cancel(Throwable cause) { + demand = 0.f; + + OutPort output = this.output; + if (output != null) { + this.output = null; + output.input = null; + output.cancel(cause); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + InPort port = (InPort) o; + return stage.equals(port.stage) && name.equals(port.name); + } + + @Override + public int hashCode() { + return Objects.hash(stage.parentGraph, name); + } + + /** + * This method is invoked when the inlet is connected to an outlet. + */ + void connect() { + OutPort output = this.output; + output.pull(capacity); + } + + /** + * Push a flow from an outlet to this inlet. + * + * @param demand The rate of flow to push. + */ + void push(float demand) { + // No-op when the rate is unchanged + if (this.demand == demand) { + return; + } + + try { + handler.onPush(this, demand); + this.demand = demand; + + if (!mask) { + stage.invalidate(clock.millis()); + } + } catch (Exception e) { + stage.doFail(e); + } + } + + /** + * This method is invoked by the connected {@link OutPort} when it finishes. + */ + void finish(Throwable cause) { + try { + long now = clock.millis(); + handler.onUpstreamFinish(this, cause); + this.demand = 0.f; + + if (!mask) { + stage.invalidate(now); + } + } catch (Exception e) { + stage.doFail(e); + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Inlet.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Inlet.java new file mode 100644 index 00000000..4a9ea6a5 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Inlet.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +/** + * An in-going edge in a {@link FlowGraph}. + */ +public interface Inlet { + /** + * Return the {@link FlowGraph} to which the inlet is exposed. + */ + FlowGraph getGraph(); + + /** + * Return the name of the inlet. + */ + String getName(); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InvocationStack.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InvocationStack.java new file mode 100644 index 00000000..a5b5114b --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InvocationStack.java @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +import java.util.Arrays; + +/** + * A specialized monotonic stack implementation for tracking the scheduled engine invocations. + *

+ * By using a specialized class, we reduce the overhead caused by type-erasure. + */ +final class InvocationStack { + /** + * The array of elements in the stack. + */ + private long[] elements; + + private int head = -1; + + public InvocationStack(int initialCapacity) { + elements = new long[initialCapacity]; + Arrays.fill(elements, Long.MIN_VALUE); + } + + /** + * Try to add the specified invocation to the monotonic stack. + * + * @param invocation The timestamp of the invocation. + * @return true if the invocation was added, false otherwise. + */ + boolean tryAdd(long invocation) { + final long[] es = elements; + int head = this.head; + + if (head < 0 || es[head] > invocation) { + es[head + 1] = invocation; + this.head = head + 1; + + if (head + 2 == es.length) { + doubleCapacity(); + } + + return true; + } + + return false; + } + + /** + * Remove the head invocation from the stack or return {@link Long#MAX_VALUE} if the stack is empty. + */ + long poll() { + final long[] es = elements; + int head = this.head--; + + if (head >= 0) { + return es[head]; + } + + return Long.MAX_VALUE; + } + + /** + * Doubles the capacity of this deque + */ + private void doubleCapacity() { + int oldCapacity = elements.length; + int newCapacity = oldCapacity + (oldCapacity >> 1); + if (newCapacity < 0) { + throw new IllegalStateException("Sorry, deque too big"); + } + + elements = Arrays.copyOf(elements, newCapacity); + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandler.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandler.java new file mode 100644 index 00000000..723c6d6b --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandler.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +/** + * Collection of callbacks for the output port (a {@link OutPort}) of a {@link FlowStageLogic}. + */ +public interface OutHandler { + /** + * This method is invoked when another {@link FlowStageLogic} changes the capacity of the outlet. + * + * @param port The output port of which the capacity was changed. + * @param capacity The new capacity of the outlet. + */ + void onPull(OutPort port, float capacity); + + /** + * This method is invoked when the output port no longer accepts any flow. + *

+ * After this callback no other callbacks will be called for this port. + * + * @param port The outlet that no longer accepts any flow. + * @param cause The cause of the output port no longer accepting any flow or null if the port closed + * successfully. + */ + void onDownstreamFinish(OutPort port, Throwable cause); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandlers.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandlers.java new file mode 100644 index 00000000..8fbfda0d --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandlers.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +/** + * A collection of common {@link OutHandler} implementations. + */ +public class OutHandlers { + /** + * Prevent construction of this class. + */ + private OutHandlers() {} + + /** + * Return an {@link OutHandler} that does nothing. + */ + public static OutHandler noop() { + return NoopOutHandler.INSTANCE; + } + + /** + * No-op implementation of {@link OutHandler}. + */ + private static final class NoopOutHandler implements OutHandler { + public static final OutHandler INSTANCE = new NoopOutHandler(); + + @Override + public void onPull(OutPort port, float capacity) {} + + @Override + public void onDownstreamFinish(OutPort port, Throwable cause) {} + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java new file mode 100644 index 00000000..332296a0 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java @@ -0,0 +1,224 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +import java.time.Clock; +import java.util.Objects; + +/** + * A port that outputs a flow. + *

+ * Output ports are represented as out-going edges in the flow graph. + */ +public final class OutPort implements Outlet { + private final int id; + + private float capacity; + private float demand; + + private boolean mask; + + InPort input; + private OutHandler handler = OutHandlers.noop(); + private final String name; + private final FlowStage stage; + private final Clock clock; + + OutPort(FlowStage stage, String name, int id) { + this.name = name; + this.id = id; + this.stage = stage; + this.clock = stage.clock; + } + + @Override + public FlowGraph getGraph() { + return stage.parentGraph; + } + + @Override + public String getName() { + return name; + } + + /** + * Return the identifier of the {@link OutPort} with respect to its stage. + */ + public int getId() { + return id; + } + + /** + * Return the capacity of the output port. + */ + public float getCapacity() { + return capacity; + } + + /** + * Return the current demand of flow of the output port. + */ + public float getDemand() { + return demand; + } + + /** + * Return the current rate of flow of the input port. + */ + public float getRate() { + InPort input = this.input; + if (input != null) { + return input.getRate(); + } + + return 0.f; + } + + /** + * Return the current {@link OutHandler} of the output port. + */ + public OutHandler getHandler() { + return handler; + } + + /** + * Set the {@link OutHandler} of the output port. + */ + public void setHandler(OutHandler handler) { + this.handler = handler; + } + + /** + * Return the mask of this port. + *

+ * Stages ignore events originating from masked ports. + */ + public boolean getMask() { + return mask; + } + + /** + * (Un)mask the port. + */ + public void setMask(boolean mask) { + this.mask = mask; + } + + /** + * Push the given flow rate over output port. + * + * @param rate The rate of the flow to push. + */ + public void push(float rate) { + demand = rate; + InPort input = this.input; + + if (input != null) { + input.push(rate); + } + } + + /** + * Signal to the downstream port that the output has completed successfully and disconnect the port from its input. + *

+ * The output port can still be used and re-connected to another input. + */ + public void complete() { + fail(null); + } + + /** + * Signal a failure to the downstream port and disconnect the port from its input. + *

+ * The output can still be used and re-connected to another input. + */ + public void fail(Throwable cause) { + capacity = 0.f; + + InPort input = this.input; + if (input != null) { + this.input = null; + input.output = null; + input.finish(cause); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + OutPort port = (OutPort) o; + return stage.equals(port.stage) && name.equals(port.name); + } + + @Override + public int hashCode() { + return Objects.hash(stage.parentGraph, name); + } + + /** + * This method is invoked when the outlet is connected to an inlet. + */ + void connect() { + input.push(demand); + } + + /** + * Pull from this outlet with a specified capacity. + * + * @param capacity The capacity of the inlet. + */ + void pull(float capacity) { + // No-op when outlet is not active or the rate is unchanged + if (this.capacity == capacity) { + return; + } + + try { + handler.onPull(this, capacity); + this.capacity = capacity; + + if (!mask) { + stage.invalidate(clock.millis()); + } + } catch (Exception e) { + stage.doFail(e); + } + } + + /** + * This method is invoked by the connected {@link InPort} when downstream cancels the connection. + */ + void cancel(Throwable cause) { + try { + handler.onDownstreamFinish(this, cause); + this.capacity = 0.f; + + if (!mask) { + stage.invalidate(clock.millis()); + } + } catch (Exception e) { + stage.doFail(e); + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Outlet.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Outlet.java new file mode 100644 index 00000000..32e19a3b --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Outlet.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +/** + * An out-going edge in a {@link FlowGraph}. + */ +public interface Outlet { + /** + * Return the {@link FlowGraph} to which the outlet is exposed. + */ + FlowGraph getGraph(); + + /** + * Return the name of the outlet. + */ + String getName(); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java new file mode 100644 index 00000000..1a99d0cf --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.mux; + +import org.opendc.simulator.flow2.FlowStageLogic; +import org.opendc.simulator.flow2.Inlet; +import org.opendc.simulator.flow2.Outlet; + +/** + * A {@link FlowStageLogic} that multiplexes multiple inputs over (possibly) multiple outputs. + */ +public interface FlowMultiplexer { + /** + * Return the number of active inputs on this multiplexer. + */ + int getInputCount(); + + /** + * Allocate a new input on this multiplexer with the specified capacity.. + * + * @return The identifier of the input for this stage. + */ + Inlet newInput(); + + /** + * Release the input at the specified slot. + * + * @param inlet The inlet to release. + */ + void releaseInput(Inlet inlet); + + /** + * Return the number of active outputs on this multiplexer. + */ + int getOutputCount(); + + /** + * Allocate a new output on this multiplexer. + * + * @return The outlet for this stage. + */ + Outlet newOutput(); + + /** + * Release the output at the specified slot. + * + * @param outlet The outlet to release. + */ + void releaseOutput(Outlet outlet); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java new file mode 100644 index 00000000..ca0639f5 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java @@ -0,0 +1,268 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.mux; + +import org.opendc.simulator.flow2.FlowGraph; +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.FlowStageLogic; +import org.opendc.simulator.flow2.InHandler; +import org.opendc.simulator.flow2.InPort; +import org.opendc.simulator.flow2.Inlet; +import org.opendc.simulator.flow2.OutHandler; +import org.opendc.simulator.flow2.OutPort; +import org.opendc.simulator.flow2.Outlet; + +import java.util.Arrays; +import java.util.BitSet; + +/** + * A {@link FlowMultiplexer} implementation that distributes the available capacity of the outputs over the inputs + * using max-min fair sharing. + *

+ * The max-min fair sharing algorithm of this multiplexer ensures that each input receives a fair share of the combined + * output capacity, but allows individual inputs to use more capacity if there is still capacity left. + */ +public final class MaxMinFlowMultiplexer implements FlowMultiplexer, FlowStageLogic { + private final FlowStage stage; + private final BitSet activeInputs; + private final BitSet activeOutputs; + + private float capacity = 0.f; + private float demand = 0.f; + private float rate = 0.f; + + private InPort[] inlets; + private long[] inputs; + private float[] rates; + private OutPort[] outlets; + + private final MultiplexerInHandler inHandler = new MultiplexerInHandler(); + private final MultiplexerOutHandler outHandler = new MultiplexerOutHandler(); + + /** + * Construct a {@link MaxMinFlowMultiplexer} instance. + * + * @param graph The {@link FlowGraph} to add the multiplexer to. + */ + public MaxMinFlowMultiplexer(FlowGraph graph) { + this.stage = graph.newStage(this); + this.activeInputs = new BitSet(); + this.activeOutputs = new BitSet(); + + this.inlets = new InPort[4]; + this.inputs = new long[4]; + this.rates = new float[4]; + this.outlets = new OutPort[4]; + } + + @Override + public long onUpdate(FlowStage ctx, long now) { + float capacity = this.capacity; + float demand = this.demand; + float rate = demand; + + if (demand > capacity) { + rate = redistributeCapacity(inlets, inputs, rates, capacity); + } + + if (this.rate != rate) { + // Only update the outputs if the output rate has changed + this.rate = rate; + + changeRate(activeOutputs, outlets, capacity, rate); + } + + return Long.MAX_VALUE; + } + + @Override + public int getInputCount() { + return activeInputs.length(); + } + + @Override + public Inlet newInput() { + final BitSet activeInputs = this.activeInputs; + int slot = activeInputs.nextClearBit(0); + + InPort port = stage.getInlet("in" + slot); + port.setHandler(inHandler); + port.pull(this.capacity); + + InPort[] inlets = this.inlets; + if (slot >= inlets.length) { + int newLength = inlets.length + (inlets.length >> 1); + inlets = Arrays.copyOf(inlets, newLength); + inputs = Arrays.copyOf(inputs, newLength); + rates = Arrays.copyOf(rates, newLength); + this.inlets = inlets; + } + inlets[slot] = port; + + activeInputs.set(slot); + return port; + } + + @Override + public void releaseInput(Inlet inlet) { + InPort port = (InPort) inlet; + + activeInputs.clear(port.getId()); + port.cancel(null); + } + + @Override + public int getOutputCount() { + return activeOutputs.length(); + } + + @Override + public Outlet newOutput() { + final BitSet activeOutputs = this.activeOutputs; + int slot = activeOutputs.nextClearBit(0); + + OutPort port = stage.getOutlet("out" + slot); + port.setHandler(outHandler); + + OutPort[] outlets = this.outlets; + if (slot >= outlets.length) { + int newLength = outlets.length + (outlets.length >> 1); + outlets = Arrays.copyOf(outlets, newLength); + this.outlets = outlets; + } + outlets[slot] = port; + + activeOutputs.set(slot); + return port; + } + + @Override + public void releaseOutput(Outlet outlet) { + OutPort port = (OutPort) outlet; + activeInputs.clear(port.getId()); + port.complete(); + } + + /** + * Helper function to redistribute the specified capacity across the inlets. + */ + private static float redistributeCapacity(InPort[] inlets, long[] inputs, float[] rates, float capacity) { + // If the demand is higher than the capacity, we need use max-min fair sharing to distribute the + // constrained capacity across the inputs. + for (int i = 0; i < inputs.length; i++) { + InPort inlet = inlets[i]; + if (inlet == null) { + break; + } + + inputs[i] = ((long) Float.floatToRawIntBits(inlet.getDemand()) << 32) | (i & 0xFFFFFFFFL); + } + Arrays.sort(inputs); + + float availableCapacity = capacity; + int inputSize = inputs.length; + + // Divide the available output capacity fairly over the inputs using max-min fair sharing + for (int i = 0; i < inputs.length; i++) { + long v = inputs[i]; + int slot = (int) v; + float d = Float.intBitsToFloat((int) (v >> 32)); + + if (d == 0.0) { + continue; + } + + float availableShare = availableCapacity / (inputSize - i); + float r = Math.min(d, availableShare); + + rates[slot] = r; + availableCapacity -= r; + } + + return capacity - availableCapacity; + } + + /** + * Helper method to change the rate of the outlets. + */ + private static void changeRate(BitSet activeOutputs, OutPort[] outlets, float capacity, float rate) { + // Divide the requests over the available capacity of the input resources fairly + for (int i = activeOutputs.nextSetBit(0); i != -1; i = activeOutputs.nextSetBit(i + 1)) { + OutPort outlet = outlets[i]; + float fraction = outlet.getCapacity() / capacity; + outlet.push(rate * fraction); + } + } + + /** + * A {@link InHandler} implementation for the multiplexer inputs. + */ + private class MultiplexerInHandler implements InHandler { + @Override + public float getRate(InPort port) { + return rates[port.getId()]; + } + + @Override + public void onPush(InPort port, float demand) { + MaxMinFlowMultiplexer.this.demand += -port.getDemand() + demand; + rates[port.getId()] = demand; + } + + @Override + public void onUpstreamFinish(InPort port, Throwable cause) { + MaxMinFlowMultiplexer.this.demand -= port.getDemand(); + releaseInput(port); + rates[port.getId()] = 0.f; + } + } + + /** + * A {@link OutHandler} implementation for the multiplexer outputs. + */ + private class MultiplexerOutHandler implements OutHandler { + @Override + public void onPull(OutPort port, float capacity) { + float newCapacity = MaxMinFlowMultiplexer.this.capacity - port.getCapacity() + capacity; + MaxMinFlowMultiplexer.this.capacity = newCapacity; + changeInletCapacity(newCapacity); + } + + @Override + public void onDownstreamFinish(OutPort port, Throwable cause) { + float newCapacity = MaxMinFlowMultiplexer.this.capacity - port.getCapacity(); + MaxMinFlowMultiplexer.this.capacity = newCapacity; + releaseOutput(port); + changeInletCapacity(newCapacity); + } + + private void changeInletCapacity(float capacity) { + BitSet activeInputs = MaxMinFlowMultiplexer.this.activeInputs; + InPort[] inlets = MaxMinFlowMultiplexer.this.inlets; + + for (int i = activeInputs.nextSetBit(0); i != -1; i = activeInputs.nextSetBit(i + 1)) { + inlets[i].pull(capacity); + } + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/FlowSink.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/FlowSink.java new file mode 100644 index 00000000..69c94708 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/FlowSink.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.sink; + +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.Inlet; + +/** + * A {@link FlowStage} with a single input. + */ +public interface FlowSink { + /** + * Return the input of this {@link FlowSink}. + */ + Inlet getInput(); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/SimpleFlowSink.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/SimpleFlowSink.java new file mode 100644 index 00000000..fdfe5ee8 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/SimpleFlowSink.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.sink; + +import org.opendc.simulator.flow2.FlowGraph; +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.FlowStageLogic; +import org.opendc.simulator.flow2.InHandler; +import org.opendc.simulator.flow2.InPort; +import org.opendc.simulator.flow2.Inlet; + +/** + * A sink with a fixed capacity. + */ +public final class SimpleFlowSink implements FlowSink, FlowStageLogic { + private final FlowStage stage; + private final InPort input; + private final Handler handler; + + /** + * Construct a new {@link SimpleFlowSink} with the specified initial capacity. + * + * @param graph The graph to add the sink to. + * @param initialCapacity The initial capacity of the sink. + */ + public SimpleFlowSink(FlowGraph graph, float initialCapacity) { + this.stage = graph.newStage(this); + this.handler = new Handler(); + this.input = stage.getInlet("in"); + this.input.pull(initialCapacity); + this.input.setMask(true); + this.input.setHandler(handler); + } + + /** + * Return the {@link Inlet} of this sink. + */ + @Override + public Inlet getInput() { + return input; + } + + /** + * Return the capacity of the sink. + */ + public float getCapacity() { + return input.getCapacity(); + } + + /** + * Update the capacity of the sink. + * + * @param capacity The new capacity to update the sink to. + */ + public void setCapacity(float capacity) { + input.pull(capacity); + stage.invalidate(); + } + + /** + * Return the flow rate of the sink. + */ + public float getRate() { + return input.getRate(); + } + + /** + * Remove this node from the graph. + */ + public void close() { + stage.close(); + } + + @Override + public long onUpdate(FlowStage ctx, long now) { + InPort input = this.input; + handler.rate = Math.min(input.getDemand(), input.getCapacity()); + return Long.MAX_VALUE; + } + + /** + * The {@link InHandler} implementation for the sink. + */ + private static final class Handler implements InHandler { + float rate; + + @Override + public float getRate(InPort port) { + return rate; + } + + @Override + public void onPush(InPort port, float demand) { + float capacity = port.getCapacity(); + rate = Math.min(demand, capacity); + } + + @Override + public void onUpstreamFinish(InPort port, Throwable cause) { + rate = 0.f; + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/EmptyFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/EmptyFlowSource.java new file mode 100644 index 00000000..2dcc66e4 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/EmptyFlowSource.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.source; + +import org.opendc.simulator.flow2.FlowGraph; +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.FlowStageLogic; +import org.opendc.simulator.flow2.OutPort; +import org.opendc.simulator.flow2.Outlet; + +/** + * An empty {@link FlowSource}. + */ +public final class EmptyFlowSource implements FlowSource, FlowStageLogic { + private final FlowStage stage; + private final OutPort output; + + /** + * Construct a new {@link EmptyFlowSource}. + */ + public EmptyFlowSource(FlowGraph graph) { + this.stage = graph.newStage(this); + this.output = stage.getOutlet("out"); + } + + /** + * Return the {@link Outlet} of the source. + */ + @Override + public Outlet getOutput() { + return output; + } + + /** + * Remove this node from the graph. + */ + public void close() { + stage.close(); + } + + @Override + public long onUpdate(FlowStage ctx, long now) { + return Long.MAX_VALUE; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/FlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/FlowSource.java new file mode 100644 index 00000000..f9432c33 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/FlowSource.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.source; + +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.Outlet; + +/** + * A {@link FlowStage} with a single output. + */ +public interface FlowSource { + /** + * Return the output of this {@link FlowSource}. + */ + Outlet getOutput(); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java new file mode 100644 index 00000000..a237c81e --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.source; + +import org.opendc.simulator.flow2.FlowGraph; +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.FlowStageLogic; +import org.opendc.simulator.flow2.OutHandler; +import org.opendc.simulator.flow2.OutPort; +import org.opendc.simulator.flow2.Outlet; + +import java.util.function.Consumer; + +/** + * A {@link FlowSource} that ensures a flow is emitted for a specified amount of time at some utilization. + */ +public class RuntimeFlowSource implements FlowSource, FlowStageLogic { + private final float utilization; + + private final FlowStage stage; + private final OutPort output; + private final Consumer completionHandler; + + private long duration; + private long lastPull; + + /** + * Construct a {@link RuntimeFlowSource} instance. + * + * @param graph The {@link FlowGraph} to which this source belongs. + * @param duration The duration of the source. + * @param utilization The utilization of the capacity of the outlet. + * @param completionHandler A callback invoked when the source completes. + */ + public RuntimeFlowSource( + FlowGraph graph, long duration, float utilization, Consumer completionHandler) { + if (duration <= 0) { + throw new IllegalArgumentException("Duration must be positive and non-zero"); + } + + if (utilization <= 0.0) { + throw new IllegalArgumentException("Utilization must be positive and non-zero"); + } + + this.stage = graph.newStage(this); + this.output = stage.getOutlet("out"); + this.output.setHandler(new OutHandler() { + @Override + public void onPull(OutPort port, float capacity) {} + + @Override + public void onDownstreamFinish(OutPort port, Throwable cause) { + // Source cannot complete without re-connecting to another sink, so mark the source as completed + completionHandler.accept(RuntimeFlowSource.this); + } + }); + this.duration = duration; + this.utilization = utilization; + this.completionHandler = completionHandler; + this.lastPull = graph.getEngine().getClock().millis(); + } + + /** + * Construct a new {@link RuntimeFlowSource}. + * + * @param graph The {@link FlowGraph} to which this source belongs. + * @param duration The duration of the source. + * @param utilization The utilization of the capacity of the outlet. + */ + public RuntimeFlowSource(FlowGraph graph, long duration, float utilization) { + this(graph, duration, utilization, RuntimeFlowSource::close); + } + + /** + * Return the {@link Outlet} of the source. + */ + @Override + public Outlet getOutput() { + return output; + } + + /** + * Remove this node from the graph. + */ + public void close() { + stage.close(); + } + + @Override + public long onUpdate(FlowStage ctx, long now) { + long lastPull = this.lastPull; + this.lastPull = now; + + long delta = Math.max(0, now - lastPull); + + OutPort output = this.output; + float limit = output.getCapacity() * utilization; + long duration = this.duration - delta; + + if (duration <= 0) { + completionHandler.accept(this); + return Long.MAX_VALUE; + } + + this.duration = duration; + output.push(limit); + return now + duration; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java new file mode 100644 index 00000000..764a20a8 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.source; + +import org.opendc.simulator.flow2.FlowGraph; +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.FlowStageLogic; +import org.opendc.simulator.flow2.OutHandler; +import org.opendc.simulator.flow2.OutPort; +import org.opendc.simulator.flow2.Outlet; + +import java.util.function.Consumer; + +/** + * A flow source that contains a fixed amount and is pushed with a given utilization. + */ +public final class SimpleFlowSource implements FlowSource, FlowStageLogic { + private final float utilization; + private float remainingAmount; + private long lastPull; + + private final FlowStage stage; + private final OutPort output; + private final Consumer completionHandler; + + /** + * Construct a new {@link SimpleFlowSource}. + * + * @param graph The {@link FlowGraph} to which this source belongs. + * @param amount The amount to transfer via the outlet. + * @param utilization The utilization of the capacity of the outlet. + * @param completionHandler A callback invoked when the source completes. + */ + public SimpleFlowSource( + FlowGraph graph, float amount, float utilization, Consumer completionHandler) { + if (amount < 0.0) { + throw new IllegalArgumentException("Amount must be non-negative"); + } + + if (utilization <= 0.0) { + throw new IllegalArgumentException("Utilization must be positive and non-zero"); + } + + this.stage = graph.newStage(this); + this.output = stage.getOutlet("out"); + this.output.setHandler(new OutHandler() { + @Override + public void onPull(OutPort port, float capacity) {} + + @Override + public void onDownstreamFinish(OutPort port, Throwable cause) { + // Source cannot complete without re-connecting to another sink, so mark the source as completed + completionHandler.accept(SimpleFlowSource.this); + } + }); + this.completionHandler = completionHandler; + this.utilization = utilization; + this.remainingAmount = amount; + this.lastPull = graph.getEngine().getClock().millis(); + } + + /** + * Construct a new {@link SimpleFlowSource}. + * + * @param graph The {@link FlowGraph} to which this source belongs. + * @param amount The amount to transfer via the outlet. + * @param utilization The utilization of the capacity of the outlet. + */ + public SimpleFlowSource(FlowGraph graph, float amount, float utilization) { + this(graph, amount, utilization, SimpleFlowSource::close); + } + + /** + * Return the {@link Outlet} of the source. + */ + @Override + public Outlet getOutput() { + return output; + } + + /** + * Remove this node from the graph. + */ + public void close() { + stage.close(); + } + + @Override + public long onUpdate(FlowStage ctx, long now) { + long lastPull = this.lastPull; + this.lastPull = now; + + long delta = Math.max(0, now - lastPull); + + OutPort output = this.output; + float consumed = output.getRate() * delta / 1000.f; + float limit = output.getCapacity() * utilization; + + float remainingAmount = this.remainingAmount - consumed; + this.remainingAmount = remainingAmount; + + long duration = (long) Math.ceil(remainingAmount / limit * 1000); + + if (duration <= 0) { + completionHandler.accept(this); + return Long.MAX_VALUE; + } + + output.push(limit); + return now + duration; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java new file mode 100644 index 00000000..96d43aef --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java @@ -0,0 +1,152 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.source; + +import org.opendc.simulator.flow2.FlowGraph; +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.FlowStageLogic; +import org.opendc.simulator.flow2.OutHandler; +import org.opendc.simulator.flow2.OutPort; +import org.opendc.simulator.flow2.Outlet; + +import java.util.function.Consumer; + +/** + * A flow source that replays a sequence of fragments, each indicating the flow rate for some period of time. + */ +public final class TraceFlowSource implements FlowSource, FlowStageLogic { + private final OutPort output; + private final long[] deadlines; + private final float[] usages; + private final int size; + private int index; + + private final FlowStage stage; + private final Consumer completionHandler; + + /** + * Construct a {@link TraceFlowSource}. + * + * @param graph The {@link FlowGraph} to which the source belongs. + * @param trace The {@link Trace} to replay. + * @param completionHandler The completion handler to invoke when the source finishes. + */ + public TraceFlowSource(FlowGraph graph, Trace trace, Consumer completionHandler) { + this.stage = graph.newStage(this); + this.output = stage.getOutlet("out"); + this.output.setHandler(new OutHandler() { + @Override + public void onPull(OutPort port, float capacity) {} + + @Override + public void onDownstreamFinish(OutPort port, Throwable cause) { + // Source cannot complete without re-connecting to another sink, so mark the source as completed + completionHandler.accept(TraceFlowSource.this); + } + }); + this.deadlines = trace.deadlines; + this.usages = trace.usages; + this.size = trace.size; + this.completionHandler = completionHandler; + } + + /** + * Construct a {@link TraceFlowSource}. + * + * @param graph The {@link FlowGraph} to which the source belongs. + * @param trace The {@link Trace} to replay. + */ + public TraceFlowSource(FlowGraph graph, Trace trace) { + this(graph, trace, TraceFlowSource::close); + } + + @Override + public Outlet getOutput() { + return output; + } + + /** + * Remove this node from the graph. + */ + public void close() { + stage.close(); + } + + @Override + public long onUpdate(FlowStage ctx, long now) { + int size = this.size; + int index = this.index; + long[] deadlines = this.deadlines; + long deadline; + + do { + deadline = deadlines[index]; + } while (deadline <= now && ++index < size); + + if (index >= size) { + output.push(0.0f); + completionHandler.accept(this); + return Long.MAX_VALUE; + } + + this.index = index; + float usage = usages[index]; + output.push(usage); + + return deadline; + } + + /** + * A trace describes the workload over time. + */ + public static final class Trace { + private final long[] deadlines; + private final float[] usages; + private final int size; + + /** + * Construct a {@link Trace}. + * + * @param deadlines The deadlines of the trace fragments. + * @param usages The usages of the trace fragments. + * @param size The size of the trace. + */ + public Trace(long[] deadlines, float[] usages, int size) { + this.deadlines = deadlines; + this.usages = usages; + this.size = size; + } + + public long[] getDeadlines() { + return deadlines; + } + + public float[] getUsages() { + return usages; + } + + public int getSize() { + return size; + } + } +} -- cgit v1.2.3 From e4f4e1c4ebd02278dc1c24ee481357989af6abe5 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 1 Sep 2022 12:02:06 +0200 Subject: feat(sim/flow): Support flow transformations This change adds a new component, FlowTransform, which is able to transform the flow from one component to another, based on some inversible transformation. Such as component is useful when converting between different units of flow between different components. --- .../opendc/simulator/flow2/util/FlowTransform.java | 41 +++++++ .../simulator/flow2/util/FlowTransformer.java | 124 +++++++++++++++++++++ .../simulator/flow2/util/FlowTransforms.java | 57 ++++++++++ 3 files changed, 222 insertions(+) create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransform.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransformer.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransforms.java (limited to 'opendc-simulator/opendc-simulator-flow/src/main/java/org') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransform.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransform.java new file mode 100644 index 00000000..51ea7df3 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransform.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.util; + +import org.opendc.simulator.flow2.FlowGraph; + +/** + * A {@link FlowTransform} describes a transformation between two components in a {@link FlowGraph} that might operate + * at different units of flow. + */ +public interface FlowTransform { + /** + * Apply the transform to the specified flow rate. + */ + float apply(float value); + + /** + * Apply the inverse of the transformation to the specified flow rate. + */ + float applyInverse(float value); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransformer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransformer.java new file mode 100644 index 00000000..852240d8 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransformer.java @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.util; + +import org.opendc.simulator.flow2.*; +import org.opendc.simulator.flow2.sink.FlowSink; +import org.opendc.simulator.flow2.source.FlowSource; + +/** + * Helper class to transform flow from outlet to inlet. + */ +public final class FlowTransformer implements FlowStageLogic, FlowSource, FlowSink { + private final FlowStage stage; + private final InPort input; + private final OutPort output; + + /** + * Construct a new {@link FlowTransformer}. + */ + public FlowTransformer(FlowGraph graph, FlowTransform transform) { + this.stage = graph.newStage(this); + this.input = stage.getInlet("in"); + this.output = stage.getOutlet("out"); + + this.input.setHandler(new ForwardInHandler(output, transform)); + this.input.setMask(true); + this.output.setHandler(new ForwardOutHandler(input, transform)); + this.output.setMask(true); + } + + /** + * Return the {@link Outlet} of the transformer. + */ + @Override + public Outlet getOutput() { + return output; + } + + /** + * Return the {@link Inlet} of the transformer. + */ + @Override + public Inlet getInput() { + return input; + } + + /** + * Close the transformer. + */ + void close() { + stage.close(); + } + + @Override + public long onUpdate(FlowStage ctx, long now) { + return Long.MAX_VALUE; + } + + private static class ForwardInHandler implements InHandler { + private final OutPort output; + private final FlowTransform transform; + + ForwardInHandler(OutPort output, FlowTransform transform) { + this.output = output; + this.transform = transform; + } + + @Override + public float getRate(InPort port) { + return transform.applyInverse(output.getRate()); + } + + @Override + public void onPush(InPort port, float demand) { + float rate = transform.apply(demand); + output.push(rate); + } + + @Override + public void onUpstreamFinish(InPort port, Throwable cause) { + output.fail(cause); + } + } + + private static class ForwardOutHandler implements OutHandler { + private final InPort input; + private final FlowTransform transform; + + ForwardOutHandler(InPort input, FlowTransform transform) { + this.input = input; + this.transform = transform; + } + + @Override + public void onPull(OutPort port, float capacity) { + input.pull(transform.applyInverse(capacity)); + } + + @Override + public void onDownstreamFinish(OutPort port, Throwable cause) { + input.cancel(cause); + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransforms.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransforms.java new file mode 100644 index 00000000..428dbfca --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransforms.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.util; + +/** + * A collection of common {@link FlowTransform} implementations. + */ +public class FlowTransforms { + /** + * Prevent construction of this class. + */ + private FlowTransforms() {} + + /** + * Return a {@link FlowTransform} that forwards the flow rate unmodified. + */ + public static FlowTransform noop() { + return NoopFlowTransform.INSTANCE; + } + + /** + * No-op implementation of a {@link FlowTransform}. + */ + private static final class NoopFlowTransform implements FlowTransform { + static final NoopFlowTransform INSTANCE = new NoopFlowTransform(); + + @Override + public float apply(float value) { + return value; + } + + @Override + public float applyInverse(float value) { + return value; + } + } +} -- cgit v1.2.3 From 66a5266c2e6f060a11fffef1e9eed9e716056853 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 1 Sep 2022 17:10:52 +0200 Subject: feat(sim/flow): Add forwarding flow multiplexer This change implements a new `FlowMultiplexer` that forwards the inputs directly to one of the pre-allocated outputs. --- .../flow2/mux/ForwardingFlowMultiplexer.java | 233 +++++++++++++++++++++ 1 file changed, 233 insertions(+) create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java (limited to 'opendc-simulator/opendc-simulator-flow/src/main/java/org') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java new file mode 100644 index 00000000..9ff6a4c8 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java @@ -0,0 +1,233 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.mux; + +import java.util.Arrays; +import java.util.BitSet; +import org.opendc.simulator.flow2.FlowGraph; +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.FlowStageLogic; +import org.opendc.simulator.flow2.InHandler; +import org.opendc.simulator.flow2.InPort; +import org.opendc.simulator.flow2.Inlet; +import org.opendc.simulator.flow2.OutHandler; +import org.opendc.simulator.flow2.OutPort; +import org.opendc.simulator.flow2.Outlet; + +/** + * A {@link FlowMultiplexer} implementation that allocates inputs to the outputs of the multiplexer exclusively. + * This means that a single input is directly connected to an output and that the multiplexer can only support as many + * inputs as outputs. + */ +public final class ForwardingFlowMultiplexer implements FlowMultiplexer, FlowStageLogic { + public final IdleInHandler IDLE_IN_HANDLER = new IdleInHandler(); + public final IdleOutHandler IDLE_OUT_HANDLER = new IdleOutHandler(); + + private final FlowStage stage; + + private InPort[] inlets; + private OutPort[] outlets; + private final BitSet activeInputs; + private final BitSet activeOutputs; + private final BitSet availableOutputs; + + public ForwardingFlowMultiplexer(FlowGraph graph) { + this.stage = graph.newStage(this); + + this.inlets = new InPort[4]; + this.activeInputs = new BitSet(); + this.outlets = new OutPort[4]; + this.activeOutputs = new BitSet(); + this.availableOutputs = new BitSet(); + } + + @Override + public int getInputCount() { + return activeInputs.length(); + } + + @Override + public Inlet newInput() { + final BitSet activeInputs = this.activeInputs; + int slot = activeInputs.nextClearBit(0); + + InPort inPort = stage.getInlet("in" + slot); + inPort.setMask(true); + + InPort[] inlets = this.inlets; + if (slot >= inlets.length) { + int newLength = inlets.length + (inlets.length >> 1); + inlets = Arrays.copyOf(inlets, newLength); + this.inlets = inlets; + } + + final BitSet availableOutputs = this.availableOutputs; + int outSlot = availableOutputs.nextSetBit(0); + + if (outSlot < 0) { + throw new IllegalStateException("No capacity available for a new input"); + } + + inlets[slot] = inPort; + activeInputs.set(slot); + + OutPort outPort = outlets[outSlot]; + availableOutputs.clear(outSlot); + + inPort.setHandler(new ForwardingInHandler(outPort)); + outPort.setHandler(new ForwardingOutHandler(inPort)); + + inPort.pull(outPort.getCapacity()); + + return inPort; + } + + @Override + public void releaseInput(Inlet inlet) { + InPort port = (InPort) inlet; + int slot = port.getId(); + + final BitSet activeInputs = this.activeInputs; + + if (!activeInputs.get(slot)) { + return; + } + + port.cancel(null); + activeInputs.clear(slot); + + ForwardingInHandler inHandler = (ForwardingInHandler) port.getHandler(); + availableOutputs.set(inHandler.output.getId()); + + port.setHandler(IDLE_IN_HANDLER); + } + + @Override + public int getOutputCount() { + return activeOutputs.length(); + } + + @Override + public Outlet newOutput() { + final BitSet activeOutputs = this.activeOutputs; + int slot = activeOutputs.nextClearBit(0); + + OutPort port = stage.getOutlet("out" + slot); + OutPort[] outlets = this.outlets; + if (slot >= outlets.length) { + int newLength = outlets.length + (outlets.length >> 1); + outlets = Arrays.copyOf(outlets, newLength); + this.outlets = outlets; + } + outlets[slot] = port; + + activeOutputs.set(slot); + availableOutputs.set(slot); + return port; + } + + @Override + public void releaseOutput(Outlet outlet) { + OutPort port = (OutPort) outlet; + int slot = port.getId(); + activeInputs.clear(slot); + availableOutputs.clear(slot); + port.complete(); + + port.setHandler(IDLE_OUT_HANDLER); + } + + @Override + public long onUpdate(FlowStage ctx, long now) { + return Long.MAX_VALUE; + } + + class ForwardingInHandler implements InHandler { + final OutPort output; + + ForwardingInHandler(OutPort output) { + this.output = output; + } + + @Override + public float getRate(InPort port) { + return output.getRate(); + } + + @Override + public void onPush(InPort port, float rate) { + output.push(rate); + } + + @Override + public void onUpstreamFinish(InPort port, Throwable cause) { + final OutPort output = this.output; + output.push(0.f); + + releaseInput(port); + } + } + + private class ForwardingOutHandler implements OutHandler { + private final InPort input; + + ForwardingOutHandler(InPort input) { + this.input = input; + } + + @Override + public void onPull(OutPort port, float capacity) { + input.pull(capacity); + } + + @Override + public void onDownstreamFinish(OutPort port, Throwable cause) { + input.cancel(cause); + + releaseOutput(port); + } + } + + private static class IdleInHandler implements InHandler { + @Override + public float getRate(InPort port) { + return 0.f; + } + + @Override + public void onPush(InPort port, float rate) { + port.cancel(new IllegalStateException("Inlet is not allocated")); + } + + @Override + public void onUpstreamFinish(InPort port, Throwable cause) {} + } + + private static class IdleOutHandler implements OutHandler { + @Override + public void onPull(OutPort port, float capacity) {} + + @Override + public void onDownstreamFinish(OutPort port, Throwable cause) {} + } +} -- cgit v1.2.3 From 5abcbaa672d029fb390156a83c29d8d47a215f4f Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 12 Oct 2022 15:02:10 +0200 Subject: feat(sim/flow): Expose metrics on FlowMultiplexer --- .../simulator/flow2/mux/FlowMultiplexer.java | 15 ++++++++++ .../flow2/mux/ForwardingFlowMultiplexer.java | 32 ++++++++++++++++++++++ .../simulator/flow2/mux/MaxMinFlowMultiplexer.java | 15 ++++++++++ 3 files changed, 62 insertions(+) (limited to 'opendc-simulator/opendc-simulator-flow/src/main/java/org') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java index 1a99d0cf..2a23b039 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java @@ -67,4 +67,19 @@ public interface FlowMultiplexer { * @param outlet The outlet to release. */ void releaseOutput(Outlet outlet); + + /** + * Return the total input capacity of the {@link FlowMultiplexer}. + */ + float getCapacity(); + + /** + * Return the total input demand for the {@link FlowMultiplexer}. + */ + float getDemand(); + + /** + * Return the total input rate for the {@link FlowMultiplexer}. + */ + float getRate(); } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java index 9ff6a4c8..6394c3fd 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java @@ -51,6 +51,9 @@ public final class ForwardingFlowMultiplexer implements FlowMultiplexer, FlowSta private final BitSet activeOutputs; private final BitSet availableOutputs; + private float capacity = 0.f; + private float demand = 0.f; + public ForwardingFlowMultiplexer(FlowGraph graph) { this.stage = graph.newStage(this); @@ -61,6 +64,27 @@ public final class ForwardingFlowMultiplexer implements FlowMultiplexer, FlowSta this.availableOutputs = new BitSet(); } + @Override + public float getCapacity() { + return capacity; + } + + @Override + public float getDemand() { + return demand; + } + + @Override + public float getRate() { + final BitSet activeOutputs = this.activeOutputs; + final OutPort[] outlets = this.outlets; + float rate = 0.f; + for (int i = activeOutputs.nextSetBit(0); i != -1; i = activeOutputs.nextSetBit(i + 1)) { + rate += outlets[i].getRate(); + } + return rate; + } + @Override public int getInputCount() { return activeInputs.length(); @@ -176,11 +200,15 @@ public final class ForwardingFlowMultiplexer implements FlowMultiplexer, FlowSta @Override public void onPush(InPort port, float rate) { + ForwardingFlowMultiplexer.this.demand += -port.getDemand() + rate; + output.push(rate); } @Override public void onUpstreamFinish(InPort port, Throwable cause) { + ForwardingFlowMultiplexer.this.demand -= port.getDemand(); + final OutPort output = this.output; output.push(0.f); @@ -197,11 +225,15 @@ public final class ForwardingFlowMultiplexer implements FlowMultiplexer, FlowSta @Override public void onPull(OutPort port, float capacity) { + ForwardingFlowMultiplexer.this.capacity += -port.getCapacity() + capacity; + input.pull(capacity); } @Override public void onDownstreamFinish(OutPort port, Throwable cause) { + ForwardingFlowMultiplexer.this.capacity -= port.getCapacity(); + input.cancel(cause); releaseOutput(port); diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java index ca0639f5..77922066 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java @@ -75,6 +75,21 @@ public final class MaxMinFlowMultiplexer implements FlowMultiplexer, FlowStageLo this.outlets = new OutPort[4]; } + @Override + public float getCapacity() { + return capacity; + } + + @Override + public float getDemand() { + return demand; + } + + @Override + public float getRate() { + return rate; + } + @Override public long onUpdate(FlowStage ctx, long now) { float capacity = this.capacity; -- cgit v1.2.3 From 44215bd668c5fa3efe2f57fc577824478b00af57 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 1 Sep 2022 14:38:34 +0200 Subject: refactor(sim/compute): Re-implement using flow2 This change re-implements the OpenDC compute simulator framework using the new flow2 framework for modelling multi-edge flow networks. The re-implementation is written in Java and focusses on performance and clean API surface. --- .../simulator/flow2/mux/FlowMultiplexer.java | 10 +++++ .../flow2/mux/FlowMultiplexerFactory.java | 51 ++++++++++++++++++++++ .../flow2/mux/ForwardingFlowMultiplexer.java | 15 +++++++ .../simulator/flow2/mux/MaxMinFlowMultiplexer.java | 20 +++++++-- .../simulator/flow2/source/RuntimeFlowSource.java | 5 +-- .../simulator/flow2/source/SimpleFlowSource.java | 5 +-- .../simulator/flow2/source/TraceFlowSource.java | 3 +- 7 files changed, 98 insertions(+), 11 deletions(-) create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexerFactory.java (limited to 'opendc-simulator/opendc-simulator-flow/src/main/java/org') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java index 2a23b039..dec98955 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java @@ -30,6 +30,16 @@ import org.opendc.simulator.flow2.Outlet; * A {@link FlowStageLogic} that multiplexes multiple inputs over (possibly) multiple outputs. */ public interface FlowMultiplexer { + /** + * Return maximum number of inputs supported by the multiplexer. + */ + int getMaxInputs(); + + /** + * Return maximum number of outputs supported by the multiplexer. + */ + int getMaxOutputs(); + /** * Return the number of active inputs on this multiplexer. */ diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexerFactory.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexerFactory.java new file mode 100644 index 00000000..0b5b9141 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexerFactory.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.mux; + +import org.opendc.simulator.flow2.FlowGraph; + +/** + * Factory interface for a {@link FlowMultiplexer} implementation. + */ +public interface FlowMultiplexerFactory { + /** + * Construct a new {@link FlowMultiplexer} belonging to the specified {@link FlowGraph}. + * + * @param graph The graph to which the multiplexer belongs. + */ + FlowMultiplexer newMultiplexer(FlowGraph graph); + + /** + * Return a {@link FlowMultiplexerFactory} for {@link ForwardingFlowMultiplexer} instances. + */ + static FlowMultiplexerFactory forwardingMultiplexer() { + return ForwardingFlowMultiplexer.FACTORY; + } + + /** + * Return a {@link FlowMultiplexerFactory} for {@link MaxMinFlowMultiplexer} instances. + */ + static FlowMultiplexerFactory maxMinMultiplexer() { + return MaxMinFlowMultiplexer.FACTORY; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java index 6394c3fd..abe3510b 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java @@ -40,6 +40,11 @@ import org.opendc.simulator.flow2.Outlet; * inputs as outputs. */ public final class ForwardingFlowMultiplexer implements FlowMultiplexer, FlowStageLogic { + /** + * Factory implementation for this implementation. + */ + static FlowMultiplexerFactory FACTORY = ForwardingFlowMultiplexer::new; + public final IdleInHandler IDLE_IN_HANDLER = new IdleInHandler(); public final IdleOutHandler IDLE_OUT_HANDLER = new IdleOutHandler(); @@ -85,6 +90,16 @@ public final class ForwardingFlowMultiplexer implements FlowMultiplexer, FlowSta return rate; } + @Override + public int getMaxInputs() { + return getOutputCount(); + } + + @Override + public int getMaxOutputs() { + return Integer.MAX_VALUE; + } + @Override public int getInputCount() { return activeInputs.length(); diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java index 77922066..ac5c4f5c 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java @@ -22,6 +22,8 @@ package org.opendc.simulator.flow2.mux; +import java.util.Arrays; +import java.util.BitSet; import org.opendc.simulator.flow2.FlowGraph; import org.opendc.simulator.flow2.FlowStage; import org.opendc.simulator.flow2.FlowStageLogic; @@ -32,9 +34,6 @@ import org.opendc.simulator.flow2.OutHandler; import org.opendc.simulator.flow2.OutPort; import org.opendc.simulator.flow2.Outlet; -import java.util.Arrays; -import java.util.BitSet; - /** * A {@link FlowMultiplexer} implementation that distributes the available capacity of the outputs over the inputs * using max-min fair sharing. @@ -43,6 +42,11 @@ import java.util.BitSet; * output capacity, but allows individual inputs to use more capacity if there is still capacity left. */ public final class MaxMinFlowMultiplexer implements FlowMultiplexer, FlowStageLogic { + /** + * Factory implementation for this implementation. + */ + static FlowMultiplexerFactory FACTORY = MaxMinFlowMultiplexer::new; + private final FlowStage stage; private final BitSet activeInputs; private final BitSet activeOutputs; @@ -90,6 +94,16 @@ public final class MaxMinFlowMultiplexer implements FlowMultiplexer, FlowStageLo return rate; } + @Override + public int getMaxInputs() { + return Integer.MAX_VALUE; + } + + @Override + public int getMaxOutputs() { + return Integer.MAX_VALUE; + } + @Override public long onUpdate(FlowStage ctx, long now) { float capacity = this.capacity; diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java index a237c81e..c09987cd 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java @@ -22,6 +22,7 @@ package org.opendc.simulator.flow2.source; +import java.util.function.Consumer; import org.opendc.simulator.flow2.FlowGraph; import org.opendc.simulator.flow2.FlowStage; import org.opendc.simulator.flow2.FlowStageLogic; @@ -29,8 +30,6 @@ import org.opendc.simulator.flow2.OutHandler; import org.opendc.simulator.flow2.OutPort; import org.opendc.simulator.flow2.Outlet; -import java.util.function.Consumer; - /** * A {@link FlowSource} that ensures a flow is emitted for a specified amount of time at some utilization. */ @@ -53,7 +52,7 @@ public class RuntimeFlowSource implements FlowSource, FlowStageLogic { * @param completionHandler A callback invoked when the source completes. */ public RuntimeFlowSource( - FlowGraph graph, long duration, float utilization, Consumer completionHandler) { + FlowGraph graph, long duration, float utilization, Consumer completionHandler) { if (duration <= 0) { throw new IllegalArgumentException("Duration must be positive and non-zero"); } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java index 764a20a8..a0e9cb9d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java @@ -22,6 +22,7 @@ package org.opendc.simulator.flow2.source; +import java.util.function.Consumer; import org.opendc.simulator.flow2.FlowGraph; import org.opendc.simulator.flow2.FlowStage; import org.opendc.simulator.flow2.FlowStageLogic; @@ -29,8 +30,6 @@ import org.opendc.simulator.flow2.OutHandler; import org.opendc.simulator.flow2.OutPort; import org.opendc.simulator.flow2.Outlet; -import java.util.function.Consumer; - /** * A flow source that contains a fixed amount and is pushed with a given utilization. */ @@ -52,7 +51,7 @@ public final class SimpleFlowSource implements FlowSource, FlowStageLogic { * @param completionHandler A callback invoked when the source completes. */ public SimpleFlowSource( - FlowGraph graph, float amount, float utilization, Consumer completionHandler) { + FlowGraph graph, float amount, float utilization, Consumer completionHandler) { if (amount < 0.0) { throw new IllegalArgumentException("Amount must be non-negative"); } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java index 96d43aef..e8abc2d7 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java @@ -22,6 +22,7 @@ package org.opendc.simulator.flow2.source; +import java.util.function.Consumer; import org.opendc.simulator.flow2.FlowGraph; import org.opendc.simulator.flow2.FlowStage; import org.opendc.simulator.flow2.FlowStageLogic; @@ -29,8 +30,6 @@ import org.opendc.simulator.flow2.OutHandler; import org.opendc.simulator.flow2.OutPort; import org.opendc.simulator.flow2.Outlet; -import java.util.function.Consumer; - /** * A flow source that replays a sequence of fragments, each indicating the flow rate for some period of time. */ -- cgit v1.2.3