From ad88144923d76dfc421f0b22a0b4e670b3f6366e Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 21 Aug 2022 14:27:41 +0200 Subject: perf(sim/flow): Add support for multi-flow stages This change adds support for creating nodes in a flow graph that support multiple inputs and outputs directly, instead of our current approach where we need to re-implement the `FlowConsumerContext` interface in order to support multiple inputs or outputs. --- .../org/opendc/simulator/flow2/FlowEngine.java | 260 ++++++++++++++++++ .../java/org/opendc/simulator/flow2/FlowGraph.java | 63 +++++ .../opendc/simulator/flow2/FlowGraphInternal.java | 93 +++++++ .../java/org/opendc/simulator/flow2/FlowStage.java | 303 +++++++++++++++++++++ .../org/opendc/simulator/flow2/FlowStageLogic.java | 38 +++ .../org/opendc/simulator/flow2/FlowStageQueue.java | 109 ++++++++ .../org/opendc/simulator/flow2/FlowTimerQueue.java | 208 ++++++++++++++ .../java/org/opendc/simulator/flow2/InHandler.java | 54 ++++ .../org/opendc/simulator/flow2/InHandlers.java | 53 ++++ .../java/org/opendc/simulator/flow2/InPort.java | 214 +++++++++++++++ .../java/org/opendc/simulator/flow2/Inlet.java | 38 +++ .../opendc/simulator/flow2/InvocationStack.java | 95 +++++++ .../org/opendc/simulator/flow2/OutHandler.java | 47 ++++ .../org/opendc/simulator/flow2/OutHandlers.java | 53 ++++ .../java/org/opendc/simulator/flow2/OutPort.java | 224 +++++++++++++++ .../java/org/opendc/simulator/flow2/Outlet.java | 38 +++ .../simulator/flow2/mux/FlowMultiplexer.java | 70 +++++ .../simulator/flow2/mux/MaxMinFlowMultiplexer.java | 268 ++++++++++++++++++ .../org/opendc/simulator/flow2/sink/FlowSink.java | 36 +++ .../simulator/flow2/sink/SimpleFlowSink.java | 123 +++++++++ .../simulator/flow2/source/EmptyFlowSource.java | 65 +++++ .../opendc/simulator/flow2/source/FlowSource.java | 36 +++ .../simulator/flow2/source/RuntimeFlowSource.java | 129 +++++++++ .../simulator/flow2/source/SimpleFlowSource.java | 132 +++++++++ .../simulator/flow2/source/TraceFlowSource.java | 152 +++++++++++ 25 files changed, 2901 insertions(+) create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraph.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraphInternal.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageLogic.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageQueue.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowTimerQueue.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandler.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandlers.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Inlet.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InvocationStack.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandler.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandlers.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Outlet.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/FlowSink.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/SimpleFlowSink.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/EmptyFlowSource.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/FlowSource.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java (limited to 'opendc-simulator/opendc-simulator-flow/src/main') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java new file mode 100644 index 00000000..0ebb0da9 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java @@ -0,0 +1,260 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +import java.time.Clock; +import java.util.ArrayList; +import java.util.List; +import kotlin.coroutines.ContinuationInterceptor; +import kotlin.coroutines.CoroutineContext; +import kotlinx.coroutines.Delay; + +/** + * A {@link FlowEngine} simulates a generic flow network. + *

+ * The engine centralizes the scheduling logic of state updates of flow connections, allowing update propagation + * to happen more efficiently. and overall, reducing the work necessary to transition into a steady state. + */ +public final class FlowEngine implements Runnable { + /** + * The queue of {@link FlowStage} updates that are scheduled for immediate execution. + */ + private final FlowStageQueue queue = new FlowStageQueue(256); + + /** + * A priority queue containing the {@link FlowStage} updates to be scheduled in the future. + */ + private final FlowTimerQueue timerQueue = new FlowTimerQueue(256); + + /** + * The stack of engine invocations to occur in the future. + */ + private final InvocationStack futureInvocations = new InvocationStack(256); + + /** + * A flag to indicate that the engine is active. + */ + private boolean active; + + private final CoroutineContext coroutineContext; + private final Clock clock; + private final Delay delay; + + /** + * Create a new {@link FlowEngine} instance using the specified {@link CoroutineContext} and {@link Clock}. + */ + public static FlowEngine create(CoroutineContext coroutineContext, Clock clock) { + return new FlowEngine(coroutineContext, clock); + } + + FlowEngine(CoroutineContext coroutineContext, Clock clock) { + this.coroutineContext = coroutineContext; + this.clock = clock; + + CoroutineContext.Key key = ContinuationInterceptor.Key; + this.delay = (Delay) coroutineContext.get(key); + } + + /** + * Obtain the (virtual) {@link Clock} driving the simulation. + */ + public Clock getClock() { + return clock; + } + + /** + * Return a new {@link FlowGraph} that can be used to build a flow network. + */ + public FlowGraph newGraph() { + return new RootGraph(this); + } + + /** + * Enqueue the specified {@link FlowStage} to be updated immediately during the active engine cycle. + *

+ * This method should be used when the state of a flow context is invalidated/interrupted and needs to be + * re-computed. + */ + void scheduleImmediate(long now, FlowStage ctx) { + scheduleImmediateInContext(ctx); + + // In-case the engine is already running in the call-stack, return immediately. The changes will be picked + // up by the active engine. + if (active) { + return; + } + + trySchedule(futureInvocations, now, now); + } + + /** + * Enqueue the specified {@link FlowStage} to be updated immediately during the active engine cycle. + *

+ * This method should be used when the state of a flow context is invalidated/interrupted and needs to be + * re-computed. + *

+ * This method should only be invoked while inside an engine cycle. + */ + void scheduleImmediateInContext(FlowStage ctx) { + queue.add(ctx); + } + + /** + * Enqueue the specified {@link FlowStage} to be updated at its updated deadline. + */ + void scheduleDelayed(FlowStage ctx) { + scheduleDelayedInContext(ctx); + + // In-case the engine is already running in the call-stack, return immediately. The changes will be picked + // up by the active engine. + if (active) { + return; + } + + long deadline = timerQueue.peekDeadline(); + if (deadline != Long.MAX_VALUE) { + trySchedule(futureInvocations, clock.millis(), deadline); + } + } + + /** + * Enqueue the specified {@link FlowStage} to be updated at its updated deadline. + *

+ * This method should only be invoked while inside an engine cycle. + */ + void scheduleDelayedInContext(FlowStage ctx) { + FlowTimerQueue timerQueue = this.timerQueue; + timerQueue.enqueue(ctx); + } + + /** + * Run all the enqueued actions for the specified timestamp (now). + */ + private void doRunEngine(long now) { + final FlowStageQueue queue = this.queue; + final FlowTimerQueue timerQueue = this.timerQueue; + + try { + // Mark the engine as active to prevent concurrent calls to this method + active = true; + + // Execute all scheduled updates at current timestamp + while (true) { + final FlowStage ctx = timerQueue.poll(now); + if (ctx == null) { + break; + } + + ctx.onUpdate(now); + } + + // Execute all immediate updates + while (true) { + final FlowStage ctx = queue.poll(); + if (ctx == null) { + break; + } + + ctx.onUpdate(now); + } + } finally { + active = false; + } + + // Schedule an engine invocation for the next update to occur. + long headDeadline = timerQueue.peekDeadline(); + if (headDeadline != Long.MAX_VALUE && headDeadline >= now) { + trySchedule(futureInvocations, now, headDeadline); + } + } + + @Override + public void run() { + doRunEngine(futureInvocations.poll()); + } + + /** + * Try to schedule an engine invocation at the specified [target]. + * + * @param scheduled The queue of scheduled invocations. + * @param now The current virtual timestamp. + * @param target The virtual timestamp at which the engine invocation should happen. + */ + private void trySchedule(InvocationStack scheduled, long now, long target) { + // Only schedule a new scheduler invocation in case the target is earlier than all other pending + // scheduler invocations + if (scheduled.tryAdd(target)) { + delay.invokeOnTimeout(target - now, this, coroutineContext); + } + } + + /** + * Internal implementation of a root {@link FlowGraph}. + */ + private static final class RootGraph implements FlowGraphInternal { + private final FlowEngine engine; + private final List stages = new ArrayList<>(); + + public RootGraph(FlowEngine engine) { + this.engine = engine; + } + + @Override + public FlowEngine getEngine() { + return engine; + } + + @Override + public FlowStage newStage(FlowStageLogic logic) { + final FlowEngine engine = this.engine; + final FlowStage stage = new FlowStage(this, logic); + stages.add(stage); + long now = engine.getClock().millis(); + stage.invalidate(now); + return stage; + } + + @Override + public void connect(Outlet outlet, Inlet inlet) { + FlowGraphInternal.connect(this, outlet, inlet); + } + + @Override + public void disconnect(Outlet outlet) { + FlowGraphInternal.disconnect(this, outlet); + } + + @Override + public void disconnect(Inlet inlet) { + FlowGraphInternal.disconnect(this, inlet); + } + + /** + * Internal method to remove the specified {@link FlowStage} from the graph. + */ + @Override + public void detach(FlowStage stage) { + stages.remove(stage); + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraph.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraph.java new file mode 100644 index 00000000..f45be6cd --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraph.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +/** + * A representation of a flow network. A flow network is a directed graph where each edge has a capacity and receives an + * amount of flow that cannot exceed the edge's capacity. + */ +public interface FlowGraph { + /** + * Return the {@link FlowEngine} driving the simulation of the graph. + */ + FlowEngine getEngine(); + + /** + * Create a new {@link FlowStage} representing a node in the flow network. + * + * @param logic The logic for handling the events of the stage. + */ + FlowStage newStage(FlowStageLogic logic); + + /** + * Add an edge between the specified outlet port and inlet port in this graph. + * + * @param outlet The outlet of the source from which the flow originates. + * @param inlet The inlet of the sink that should receive the flow. + */ + void connect(Outlet outlet, Inlet inlet); + + /** + * Disconnect the specified {@link Outlet} (if connected). + * + * @param outlet The outlet to disconnect. + */ + void disconnect(Outlet outlet); + + /** + * Disconnect the specified {@link Inlet} (if connected). + * + * @param inlet The inlet to disconnect. + */ + void disconnect(Inlet inlet); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraphInternal.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraphInternal.java new file mode 100644 index 00000000..0f608b60 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraphInternal.java @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +/** + * Interface implemented by {@link FlowGraph} implementations. + */ +interface FlowGraphInternal extends FlowGraph { + /** + * Internal method to remove the specified {@link FlowStage} from the graph. + */ + void detach(FlowStage stage); + + /** + * Helper method to connect an outlet to an inlet. + */ + static void connect(FlowGraph graph, Outlet outlet, Inlet inlet) { + if (!(outlet instanceof OutPort) || !(inlet instanceof InPort)) { + throw new IllegalArgumentException("Invalid outlet or inlet passed to graph"); + } + + InPort inPort = (InPort) inlet; + OutPort outPort = (OutPort) outlet; + + if (!graph.equals(outPort.getGraph()) || !graph.equals(inPort.getGraph())) { + throw new IllegalArgumentException("Outlet or inlet does not belong to graph"); + } else if (outPort.input != null || inPort.output != null) { + throw new IllegalStateException("Inlet or outlet already connected"); + } + + outPort.input = inPort; + inPort.output = outPort; + + inPort.connect(); + outPort.connect(); + } + + /** + * Helper method to disconnect an outlet. + */ + static void disconnect(FlowGraph graph, Outlet outlet) { + if (!(outlet instanceof OutPort)) { + throw new IllegalArgumentException("Invalid outlet passed to graph"); + } + + OutPort outPort = (OutPort) outlet; + + if (!graph.equals(outPort.getGraph())) { + throw new IllegalArgumentException("Outlet or inlet does not belong to graph"); + } + + outPort.cancel(null); + outPort.complete(); + } + + /** + * Helper method to disconnect an inlet. + */ + static void disconnect(FlowGraph graph, Inlet inlet) { + if (!(inlet instanceof InPort)) { + throw new IllegalArgumentException("Invalid outlet passed to graph"); + } + + InPort inPort = (InPort) inlet; + + if (!graph.equals(inPort.getGraph())) { + throw new IllegalArgumentException("Outlet or inlet does not belong to graph"); + } + + inPort.finish(null); + inPort.cancel(null); + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java new file mode 100644 index 00000000..4d098043 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java @@ -0,0 +1,303 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +import java.time.Clock; +import java.util.HashMap; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link FlowStage} represents a node in a {@link FlowGraph}. + */ +public final class FlowStage { + private static final Logger LOGGER = LoggerFactory.getLogger(FlowStage.class); + + /** + * States of the flow stage. + */ + private static final int STAGE_PENDING = 0; // Stage is pending to be started + + private static final int STAGE_ACTIVE = 1; // Stage is actively running + private static final int STAGE_CLOSED = 2; // Stage is closed + private static final int STAGE_STATE = 0b11; // Mask for accessing the state of the flow stage + + /** + * Flags of the flow connection + */ + private static final int STAGE_INVALIDATE = 1 << 2; // The stage is invalidated + + private static final int STAGE_CLOSE = 1 << 3; // The stage should be closed + private static final int STAGE_UPDATE_ACTIVE = 1 << 4; // An update for the connection is active + private static final int STAGE_UPDATE_PENDING = 1 << 5; // An (immediate) update of the connection is pending + + /** + * The flags representing the state and pending actions for the stage. + */ + private int flags = STAGE_PENDING; + + /** + * The deadline of the stage after which an update should run. + */ + long deadline = Long.MAX_VALUE; + + /** + * The index of the timer in the {@link FlowTimerQueue}. + */ + int timerIndex = -1; + + final Clock clock; + private final FlowStageLogic logic; + final FlowGraphInternal parentGraph; + private final FlowEngine engine; + + private final Map inlets = new HashMap<>(); + private final Map outlets = new HashMap<>(); + private int nextInlet = 0; + private int nextOutlet = 0; + + /** + * Construct a new {@link FlowStage} instance. + * + * @param parentGraph The {@link FlowGraph} this stage belongs to. + * @param logic The logic of the stage. + */ + FlowStage(FlowGraphInternal parentGraph, FlowStageLogic logic) { + this.parentGraph = parentGraph; + this.logic = logic; + this.engine = parentGraph.getEngine(); + this.clock = engine.getClock(); + } + + /** + * Return the {@link FlowGraph} to which this stage belongs. + */ + public FlowGraph getGraph() { + return parentGraph; + } + + /** + * Return the {@link Inlet} (an in-going edge) with the specified name for this {@link FlowStage}. + * If an inlet with that name does not exist, a new one is allocated for the stage. + * + * @param name The name of the inlet. + * @return The {@link InPort} representing an {@link Inlet} with the specified name. + */ + public InPort getInlet(String name) { + return inlets.computeIfAbsent(name, (key) -> new InPort(this, key, nextInlet++)); + } + + /** + * Return the {@link Outlet} (an out-going edge) with the specified name for this {@link FlowStage}. + * If an outlet with that name does not exist, a new one is allocated for the stage. + * + * @param name The name of the outlet. + * @return The {@link OutPort} representing an {@link Outlet} with the specified name. + */ + public OutPort getOutlet(String name) { + return outlets.computeIfAbsent(name, (key) -> new OutPort(this, key, nextOutlet++)); + } + + /** + * Return the current deadline of the {@link FlowStage}'s timer (in milliseconds after epoch). + */ + public long getDeadline() { + return deadline; + } + + /** + * Set the deadline of the {@link FlowStage}'s timer. + * + * @param deadline The new deadline (in milliseconds after epoch) when the stage should be interrupted. + */ + public void setDeadline(long deadline) { + this.deadline = deadline; + + if ((flags & STAGE_UPDATE_ACTIVE) == 0) { + // Update the timer queue with the new deadline + engine.scheduleDelayed(this); + } + } + + /** + * Invalidate the {@link FlowStage} forcing the stage to update. + */ + public void invalidate() { + int flags = this.flags; + + if ((flags & STAGE_UPDATE_ACTIVE) == 0) { + scheduleImmediate(clock.millis(), flags | STAGE_INVALIDATE); + } + } + + /** + * Close the {@link FlowStage} and disconnect all inlets and outlets. + */ + public void close() { + int flags = this.flags; + + if ((flags & STAGE_STATE) == STAGE_CLOSED) { + return; + } + + // Toggle the close bit. In case no update is active, schedule a new update. + if ((flags & STAGE_UPDATE_ACTIVE) != 0) { + this.flags = flags | STAGE_CLOSE; + } else { + scheduleImmediate(clock.millis(), flags | STAGE_CLOSE); + } + } + + /** + * Update the state of the flow stage. + * + * @param now The current virtual timestamp. + */ + void onUpdate(long now) { + int flags = this.flags; + int state = flags & STAGE_STATE; + + if (state == STAGE_ACTIVE) { + doUpdate(now, flags); + } else if (state == STAGE_PENDING) { + doStart(now, flags); + } + } + + /** + * Invalidate the {@link FlowStage} forcing the stage to update. + * + *

+ * This method is similar to {@link #invalidate()}, but allows the user to manually pass the current timestamp to + * prevent having to re-query the clock. This method should not be called during an update. + */ + void invalidate(long now) { + scheduleImmediate(now, flags | STAGE_INVALIDATE); + } + + /** + * Schedule an immediate update for this stage. + */ + private void scheduleImmediate(long now, int flags) { + // In case an immediate update is already scheduled, no need to do anything + if ((flags & STAGE_UPDATE_PENDING) != 0) { + this.flags = flags; + return; + } + + // Mark the stage that there is an update pending + this.flags = flags | STAGE_UPDATE_PENDING; + + engine.scheduleImmediate(now, this); + } + + /** + * Start the stage. + */ + private void doStart(long now, int flags) { + // Update state before calling into the outside world, so it observes a consistent state + flags = flags | STAGE_ACTIVE | STAGE_UPDATE_ACTIVE; + + doUpdate(now, flags); + } + + /** + * Update the state of the stage. + */ + private void doUpdate(long now, int flags) { + long deadline = this.deadline; + long newDeadline = deadline; + + // Update the stage if: + // (1) the timer of the stage has expired. + // (2) one of the input ports is pushed, + // (3) one of the output ports is pulled, + if ((flags & STAGE_INVALIDATE) != 0 || deadline == now) { + // Update state before calling into the outside world, so it observes a consistent state + this.flags = (flags & ~STAGE_INVALIDATE) | STAGE_UPDATE_ACTIVE; + + try { + newDeadline = logic.onUpdate(this, now); + + // IMPORTANT: Re-fetch the flags after the callback might have changed those + flags = this.flags; + } catch (Exception e) { + doFail(e); + } + } + + // Check whether the stage is marked as closing. + if ((flags & STAGE_CLOSE) != 0) { + doClose(flags, null); + + // IMPORTANT: Re-fetch the flags after the callback might have changed those + flags = this.flags; + } + + // Indicate that no update is active anymore and flush the flags + this.flags = flags & ~(STAGE_UPDATE_ACTIVE | STAGE_UPDATE_PENDING); + this.deadline = newDeadline; + + // Update the timer queue with the new deadline + engine.scheduleDelayedInContext(this); + } + + /** + * This method is invoked when an uncaught exception is caught by the engine. When this happens, the + * {@link FlowStageLogic} "fails" and disconnects all its inputs and outputs. + */ + void doFail(Throwable cause) { + LOGGER.warn("Uncaught exception (closing stage)", cause); + + doClose(flags, cause); + } + + /** + * This method is invoked when the {@link FlowStageLogic} exits successfully or due to failure. + */ + private void doClose(int flags, Throwable cause) { + // Mark the stage as closed + this.flags = flags & ~(STAGE_STATE | STAGE_INVALIDATE | STAGE_CLOSE) | STAGE_CLOSED; + + // Remove stage from parent graph + parentGraph.detach(this); + + // Remove stage from the timer queue + setDeadline(Long.MAX_VALUE); + + // Cancel all input ports + for (InPort port : inlets.values()) { + if (port != null) { + port.cancel(cause); + } + } + + // Cancel all output ports + for (OutPort port : outlets.values()) { + if (port != null) { + port.fail(cause); + } + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageLogic.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageLogic.java new file mode 100644 index 00000000..70986a35 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageLogic.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +/** + * The {@link FlowStageLogic} interface is responsible for describing the behaviour of a {@link FlowStage} via + * out-going flows based on its potential inputs. + */ +public interface FlowStageLogic { + /** + * This method is invoked when the one of the stage's inlets or outlets is invalidated. + * + * @param ctx The context in which the stage runs. + * @param now The virtual timestamp in milliseconds after epoch at which the update is occurring. + * @return The next deadline for the stage. + */ + long onUpdate(FlowStage ctx, long now); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageQueue.java new file mode 100644 index 00000000..56ec7702 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageQueue.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +import java.util.ArrayDeque; +import java.util.Arrays; + +/** + * A specialized {@link ArrayDeque} implementation that contains the {@link FlowStageLogic}s + * that have been updated during the engine cycle and should converge. + *

+ * By using a specialized class, we reduce the overhead caused by type-erasure. + */ +final class FlowStageQueue { + /** + * The array of elements in the queue. + */ + private FlowStage[] elements; + + private int head = 0; + private int tail = 0; + + public FlowStageQueue(int initialCapacity) { + elements = new FlowStage[initialCapacity]; + } + + /** + * Add the specified context to the queue. + */ + void add(FlowStage ctx) { + final FlowStage[] es = elements; + int tail = this.tail; + + es[tail] = ctx; + + tail = inc(tail, es.length); + this.tail = tail; + + if (head == tail) { + doubleCapacity(); + } + } + + /** + * Remove a {@link FlowStage} from the queue or null if the queue is empty. + */ + FlowStage poll() { + final FlowStage[] es = elements; + int head = this.head; + FlowStage ctx = es[head]; + + if (ctx != null) { + es[head] = null; + this.head = inc(head, es.length); + } + + return ctx; + } + + /** + * Doubles the capacity of this deque + */ + private void doubleCapacity() { + int oldCapacity = elements.length; + int newCapacity = oldCapacity + (oldCapacity >> 1); + if (newCapacity < 0) { + throw new IllegalStateException("Sorry, deque too big"); + } + + final FlowStage[] es = elements = Arrays.copyOf(elements, newCapacity); + + // Exceptionally, here tail == head needs to be disambiguated + if (tail < head || (tail == head && es[head] != null)) { + // wrap around; slide first leg forward to end of array + int newSpace = newCapacity - oldCapacity; + System.arraycopy(es, head, es, head + newSpace, oldCapacity - head); + for (int i = head, to = (head += newSpace); i < to; i++) es[i] = null; + } + } + + /** + * Circularly increments i, mod modulus. + * Precondition and postcondition: 0 <= i < modulus. + */ + private static int inc(int i, int modulus) { + if (++i >= modulus) i = 0; + return i; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowTimerQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowTimerQueue.java new file mode 100644 index 00000000..4b746202 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowTimerQueue.java @@ -0,0 +1,208 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +import java.util.Arrays; + +/** + * A specialized priority queue for timers of {@link FlowStageLogic}s. + *

+ * By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation + * being generic. + */ +final class FlowTimerQueue { + /** + * Array representation of binary heap of {@link FlowStage} instances. + */ + private FlowStage[] queue; + + /** + * The number of elements in the priority queue. + */ + private int size = 0; + + /** + * Construct a {@link FlowTimerQueue} with the specified initial capacity. + * + * @param initialCapacity The initial capacity of the queue. + */ + public FlowTimerQueue(int initialCapacity) { + this.queue = new FlowStage[initialCapacity]; + } + + /** + * Enqueue a timer for the specified context or update the existing timer. + */ + void enqueue(FlowStage ctx) { + FlowStage[] es = queue; + int k = ctx.timerIndex; + + if (ctx.deadline != Long.MAX_VALUE) { + if (k >= 0) { + update(es, ctx, k); + } else { + add(es, ctx); + } + } else if (k >= 0) { + delete(es, k); + } + } + + /** + * Retrieve the head of the queue if its deadline does not exceed now. + * + * @param now The timestamp that the deadline of the head of the queue should not exceed. + * @return The head of the queue if its deadline does not exceed now, otherwise null. + */ + FlowStage poll(long now) { + int size = this.size; + if (size == 0) { + return null; + } + + final FlowStage[] es = queue; + final FlowStage head = es[0]; + + if (now < head.deadline) { + return null; + } + + int n = size - 1; + this.size = n; + final FlowStage next = es[n]; + es[n] = null; // Clear the last element of the queue + + if (n > 0) { + siftDown(0, next, es, n); + } + + head.timerIndex = -1; + return head; + } + + /** + * Find the earliest deadline in the queue. + */ + long peekDeadline() { + if (size > 0) { + return queue[0].deadline; + } + + return Long.MAX_VALUE; + } + + /** + * Add a new entry to the queue. + */ + private void add(FlowStage[] es, FlowStage ctx) { + int i = size; + + if (i >= es.length) { + // Re-fetch the resized array + es = grow(); + } + + siftUp(i, ctx, es); + + size = i + 1; + } + + /** + * Update the deadline of an existing entry in the queue. + */ + private void update(FlowStage[] es, FlowStage ctx, int k) { + if (k > 0) { + int parent = (k - 1) >>> 1; + if (es[parent].deadline > ctx.deadline) { + siftUp(k, ctx, es); + return; + } + } + + siftDown(k, ctx, es, size); + } + + /** + * Deadline an entry from the queue. + */ + private void delete(FlowStage[] es, int k) { + int s = --size; + if (s == k) { + es[k] = null; // Element is last in the queue + } else { + FlowStage moved = es[s]; + es[s] = null; + + siftDown(k, moved, es, s); + + if (es[k] == moved) { + siftUp(k, moved, es); + } + } + } + + /** + * Increases the capacity of the array. + */ + private FlowStage[] grow() { + FlowStage[] queue = this.queue; + int oldCapacity = queue.length; + int newCapacity = oldCapacity + (oldCapacity >> 1); + + queue = Arrays.copyOf(queue, newCapacity); + this.queue = queue; + return queue; + } + + private static void siftUp(int k, FlowStage key, FlowStage[] es) { + while (k > 0) { + int parent = (k - 1) >>> 1; + FlowStage e = es[parent]; + if (key.deadline >= e.deadline) break; + es[k] = e; + e.timerIndex = k; + k = parent; + } + es[k] = key; + key.timerIndex = k; + } + + private static void siftDown(int k, FlowStage key, FlowStage[] es, int n) { + int half = n >>> 1; // loop while a non-leaf + while (k < half) { + int child = (k << 1) + 1; // assume left child is least + FlowStage c = es[child]; + int right = child + 1; + if (right < n && c.deadline > es[right].deadline) c = es[child = right]; + + if (key.deadline <= c.deadline) break; + + es[k] = c; + c.timerIndex = k; + k = child; + } + + es[k] = key; + key.timerIndex = k; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandler.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandler.java new file mode 100644 index 00000000..839b01db --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandler.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +/** + * Collection of callbacks for the input port (a {@link InPort}) of a {@link FlowStageLogic}. + */ +public interface InHandler { + /** + * Return the actual flow rate over the input port. + * + * @param port The input port to which the flow was pushed. + * @return The actual flow rate over the port. + */ + default float getRate(InPort port) { + return Math.min(port.getDemand(), port.getCapacity()); + } + + /** + * This method is invoked when another {@link FlowStageLogic} changes the rate of flow to the specified inlet. + * + * @param port The input port to which the flow was pushed. + * @param demand The rate of flow the output attempted to push to the port. + */ + void onPush(InPort port, float demand); + + /** + * This method is invoked when the input port is finished. + * + * @param port The input port that has finished. + * @param cause The cause of the input port being finished or null if the port completed successfully. + */ + void onUpstreamFinish(InPort port, Throwable cause); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandlers.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandlers.java new file mode 100644 index 00000000..9d5b4bef --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandlers.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +/** + * A collection of common {@link InHandler} implementations. + */ +public class InHandlers { + /** + * Prevent construction of this class. + */ + private InHandlers() {} + + /** + * Return an {@link InHandler} that does nothing. + */ + public static InHandler noop() { + return NoopInHandler.INSTANCE; + } + + /** + * No-op implementation of {@link InHandler}. + */ + private static final class NoopInHandler implements InHandler { + public static final InHandler INSTANCE = new NoopInHandler(); + + @Override + public void onPush(InPort port, float demand) {} + + @Override + public void onUpstreamFinish(InPort port, Throwable cause) {} + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java new file mode 100644 index 00000000..fba12aaf --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java @@ -0,0 +1,214 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +import java.time.Clock; +import java.util.Objects; + +/** + * A port that consumes a flow. + *

+ * Input ports are represented as in-going edges in the flow graph. + */ +public final class InPort implements Inlet { + private final int id; + + private float capacity; + private float demand; + + private boolean mask; + + OutPort output; + private InHandler handler = InHandlers.noop(); + private final Clock clock; + private final String name; + private final FlowStage stage; + + InPort(FlowStage stage, String name, int id) { + this.name = name; + this.id = id; + this.stage = stage; + this.clock = stage.clock; + } + + @Override + public FlowGraph getGraph() { + return stage.parentGraph; + } + + @Override + public String getName() { + return name; + } + + /** + * Return the identifier of the {@link InPort} with respect to its stage. + */ + public int getId() { + return id; + } + + /** + * Return the current capacity of the input port. + */ + public float getCapacity() { + return capacity; + } + + /** + * Return the current demand of flow of the input port. + */ + public float getDemand() { + return demand; + } + + /** + * Return the current rate of flow of the input port. + */ + public float getRate() { + return handler.getRate(this); + } + + /** + * Pull the flow with the specified capacity from the input port. + * + * @param capacity The maximum throughput that the stage can receive from the input port. + */ + public void pull(float capacity) { + this.capacity = capacity; + + OutPort output = this.output; + if (output != null) { + output.pull(capacity); + } + } + + /** + * Return the current {@link InHandler} of the input port. + */ + public InHandler getHandler() { + return handler; + } + + /** + * Set the {@link InHandler} of the input port. + */ + public void setHandler(InHandler handler) { + this.handler = handler; + } + + /** + * Return the mask of this port. + *

+ * Stages ignore events originating from masked ports. + */ + public boolean getMask() { + return mask; + } + + /** + * (Un)mask the port. + */ + public void setMask(boolean mask) { + this.mask = mask; + } + + /** + * Disconnect the input port from its (potentially) connected outlet. + *

+ * The inlet can still be used and re-connected to another outlet. + * + * @param cause The cause for disconnecting the port or null when no more flow is needed. + */ + public void cancel(Throwable cause) { + demand = 0.f; + + OutPort output = this.output; + if (output != null) { + this.output = null; + output.input = null; + output.cancel(cause); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + InPort port = (InPort) o; + return stage.equals(port.stage) && name.equals(port.name); + } + + @Override + public int hashCode() { + return Objects.hash(stage.parentGraph, name); + } + + /** + * This method is invoked when the inlet is connected to an outlet. + */ + void connect() { + OutPort output = this.output; + output.pull(capacity); + } + + /** + * Push a flow from an outlet to this inlet. + * + * @param demand The rate of flow to push. + */ + void push(float demand) { + // No-op when the rate is unchanged + if (this.demand == demand) { + return; + } + + try { + handler.onPush(this, demand); + this.demand = demand; + + if (!mask) { + stage.invalidate(clock.millis()); + } + } catch (Exception e) { + stage.doFail(e); + } + } + + /** + * This method is invoked by the connected {@link OutPort} when it finishes. + */ + void finish(Throwable cause) { + try { + long now = clock.millis(); + handler.onUpstreamFinish(this, cause); + this.demand = 0.f; + + if (!mask) { + stage.invalidate(now); + } + } catch (Exception e) { + stage.doFail(e); + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Inlet.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Inlet.java new file mode 100644 index 00000000..4a9ea6a5 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Inlet.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +/** + * An in-going edge in a {@link FlowGraph}. + */ +public interface Inlet { + /** + * Return the {@link FlowGraph} to which the inlet is exposed. + */ + FlowGraph getGraph(); + + /** + * Return the name of the inlet. + */ + String getName(); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InvocationStack.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InvocationStack.java new file mode 100644 index 00000000..a5b5114b --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InvocationStack.java @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +import java.util.Arrays; + +/** + * A specialized monotonic stack implementation for tracking the scheduled engine invocations. + *

+ * By using a specialized class, we reduce the overhead caused by type-erasure. + */ +final class InvocationStack { + /** + * The array of elements in the stack. + */ + private long[] elements; + + private int head = -1; + + public InvocationStack(int initialCapacity) { + elements = new long[initialCapacity]; + Arrays.fill(elements, Long.MIN_VALUE); + } + + /** + * Try to add the specified invocation to the monotonic stack. + * + * @param invocation The timestamp of the invocation. + * @return true if the invocation was added, false otherwise. + */ + boolean tryAdd(long invocation) { + final long[] es = elements; + int head = this.head; + + if (head < 0 || es[head] > invocation) { + es[head + 1] = invocation; + this.head = head + 1; + + if (head + 2 == es.length) { + doubleCapacity(); + } + + return true; + } + + return false; + } + + /** + * Remove the head invocation from the stack or return {@link Long#MAX_VALUE} if the stack is empty. + */ + long poll() { + final long[] es = elements; + int head = this.head--; + + if (head >= 0) { + return es[head]; + } + + return Long.MAX_VALUE; + } + + /** + * Doubles the capacity of this deque + */ + private void doubleCapacity() { + int oldCapacity = elements.length; + int newCapacity = oldCapacity + (oldCapacity >> 1); + if (newCapacity < 0) { + throw new IllegalStateException("Sorry, deque too big"); + } + + elements = Arrays.copyOf(elements, newCapacity); + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandler.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandler.java new file mode 100644 index 00000000..723c6d6b --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandler.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +/** + * Collection of callbacks for the output port (a {@link OutPort}) of a {@link FlowStageLogic}. + */ +public interface OutHandler { + /** + * This method is invoked when another {@link FlowStageLogic} changes the capacity of the outlet. + * + * @param port The output port of which the capacity was changed. + * @param capacity The new capacity of the outlet. + */ + void onPull(OutPort port, float capacity); + + /** + * This method is invoked when the output port no longer accepts any flow. + *

+ * After this callback no other callbacks will be called for this port. + * + * @param port The outlet that no longer accepts any flow. + * @param cause The cause of the output port no longer accepting any flow or null if the port closed + * successfully. + */ + void onDownstreamFinish(OutPort port, Throwable cause); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandlers.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandlers.java new file mode 100644 index 00000000..8fbfda0d --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandlers.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +/** + * A collection of common {@link OutHandler} implementations. + */ +public class OutHandlers { + /** + * Prevent construction of this class. + */ + private OutHandlers() {} + + /** + * Return an {@link OutHandler} that does nothing. + */ + public static OutHandler noop() { + return NoopOutHandler.INSTANCE; + } + + /** + * No-op implementation of {@link OutHandler}. + */ + private static final class NoopOutHandler implements OutHandler { + public static final OutHandler INSTANCE = new NoopOutHandler(); + + @Override + public void onPull(OutPort port, float capacity) {} + + @Override + public void onDownstreamFinish(OutPort port, Throwable cause) {} + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java new file mode 100644 index 00000000..332296a0 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java @@ -0,0 +1,224 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +import java.time.Clock; +import java.util.Objects; + +/** + * A port that outputs a flow. + *

+ * Output ports are represented as out-going edges in the flow graph. + */ +public final class OutPort implements Outlet { + private final int id; + + private float capacity; + private float demand; + + private boolean mask; + + InPort input; + private OutHandler handler = OutHandlers.noop(); + private final String name; + private final FlowStage stage; + private final Clock clock; + + OutPort(FlowStage stage, String name, int id) { + this.name = name; + this.id = id; + this.stage = stage; + this.clock = stage.clock; + } + + @Override + public FlowGraph getGraph() { + return stage.parentGraph; + } + + @Override + public String getName() { + return name; + } + + /** + * Return the identifier of the {@link OutPort} with respect to its stage. + */ + public int getId() { + return id; + } + + /** + * Return the capacity of the output port. + */ + public float getCapacity() { + return capacity; + } + + /** + * Return the current demand of flow of the output port. + */ + public float getDemand() { + return demand; + } + + /** + * Return the current rate of flow of the input port. + */ + public float getRate() { + InPort input = this.input; + if (input != null) { + return input.getRate(); + } + + return 0.f; + } + + /** + * Return the current {@link OutHandler} of the output port. + */ + public OutHandler getHandler() { + return handler; + } + + /** + * Set the {@link OutHandler} of the output port. + */ + public void setHandler(OutHandler handler) { + this.handler = handler; + } + + /** + * Return the mask of this port. + *

+ * Stages ignore events originating from masked ports. + */ + public boolean getMask() { + return mask; + } + + /** + * (Un)mask the port. + */ + public void setMask(boolean mask) { + this.mask = mask; + } + + /** + * Push the given flow rate over output port. + * + * @param rate The rate of the flow to push. + */ + public void push(float rate) { + demand = rate; + InPort input = this.input; + + if (input != null) { + input.push(rate); + } + } + + /** + * Signal to the downstream port that the output has completed successfully and disconnect the port from its input. + *

+ * The output port can still be used and re-connected to another input. + */ + public void complete() { + fail(null); + } + + /** + * Signal a failure to the downstream port and disconnect the port from its input. + *

+ * The output can still be used and re-connected to another input. + */ + public void fail(Throwable cause) { + capacity = 0.f; + + InPort input = this.input; + if (input != null) { + this.input = null; + input.output = null; + input.finish(cause); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + OutPort port = (OutPort) o; + return stage.equals(port.stage) && name.equals(port.name); + } + + @Override + public int hashCode() { + return Objects.hash(stage.parentGraph, name); + } + + /** + * This method is invoked when the outlet is connected to an inlet. + */ + void connect() { + input.push(demand); + } + + /** + * Pull from this outlet with a specified capacity. + * + * @param capacity The capacity of the inlet. + */ + void pull(float capacity) { + // No-op when outlet is not active or the rate is unchanged + if (this.capacity == capacity) { + return; + } + + try { + handler.onPull(this, capacity); + this.capacity = capacity; + + if (!mask) { + stage.invalidate(clock.millis()); + } + } catch (Exception e) { + stage.doFail(e); + } + } + + /** + * This method is invoked by the connected {@link InPort} when downstream cancels the connection. + */ + void cancel(Throwable cause) { + try { + handler.onDownstreamFinish(this, cause); + this.capacity = 0.f; + + if (!mask) { + stage.invalidate(clock.millis()); + } + } catch (Exception e) { + stage.doFail(e); + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Outlet.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Outlet.java new file mode 100644 index 00000000..32e19a3b --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Outlet.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +/** + * An out-going edge in a {@link FlowGraph}. + */ +public interface Outlet { + /** + * Return the {@link FlowGraph} to which the outlet is exposed. + */ + FlowGraph getGraph(); + + /** + * Return the name of the outlet. + */ + String getName(); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java new file mode 100644 index 00000000..1a99d0cf --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.mux; + +import org.opendc.simulator.flow2.FlowStageLogic; +import org.opendc.simulator.flow2.Inlet; +import org.opendc.simulator.flow2.Outlet; + +/** + * A {@link FlowStageLogic} that multiplexes multiple inputs over (possibly) multiple outputs. + */ +public interface FlowMultiplexer { + /** + * Return the number of active inputs on this multiplexer. + */ + int getInputCount(); + + /** + * Allocate a new input on this multiplexer with the specified capacity.. + * + * @return The identifier of the input for this stage. + */ + Inlet newInput(); + + /** + * Release the input at the specified slot. + * + * @param inlet The inlet to release. + */ + void releaseInput(Inlet inlet); + + /** + * Return the number of active outputs on this multiplexer. + */ + int getOutputCount(); + + /** + * Allocate a new output on this multiplexer. + * + * @return The outlet for this stage. + */ + Outlet newOutput(); + + /** + * Release the output at the specified slot. + * + * @param outlet The outlet to release. + */ + void releaseOutput(Outlet outlet); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java new file mode 100644 index 00000000..ca0639f5 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java @@ -0,0 +1,268 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.mux; + +import org.opendc.simulator.flow2.FlowGraph; +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.FlowStageLogic; +import org.opendc.simulator.flow2.InHandler; +import org.opendc.simulator.flow2.InPort; +import org.opendc.simulator.flow2.Inlet; +import org.opendc.simulator.flow2.OutHandler; +import org.opendc.simulator.flow2.OutPort; +import org.opendc.simulator.flow2.Outlet; + +import java.util.Arrays; +import java.util.BitSet; + +/** + * A {@link FlowMultiplexer} implementation that distributes the available capacity of the outputs over the inputs + * using max-min fair sharing. + *

+ * The max-min fair sharing algorithm of this multiplexer ensures that each input receives a fair share of the combined + * output capacity, but allows individual inputs to use more capacity if there is still capacity left. + */ +public final class MaxMinFlowMultiplexer implements FlowMultiplexer, FlowStageLogic { + private final FlowStage stage; + private final BitSet activeInputs; + private final BitSet activeOutputs; + + private float capacity = 0.f; + private float demand = 0.f; + private float rate = 0.f; + + private InPort[] inlets; + private long[] inputs; + private float[] rates; + private OutPort[] outlets; + + private final MultiplexerInHandler inHandler = new MultiplexerInHandler(); + private final MultiplexerOutHandler outHandler = new MultiplexerOutHandler(); + + /** + * Construct a {@link MaxMinFlowMultiplexer} instance. + * + * @param graph The {@link FlowGraph} to add the multiplexer to. + */ + public MaxMinFlowMultiplexer(FlowGraph graph) { + this.stage = graph.newStage(this); + this.activeInputs = new BitSet(); + this.activeOutputs = new BitSet(); + + this.inlets = new InPort[4]; + this.inputs = new long[4]; + this.rates = new float[4]; + this.outlets = new OutPort[4]; + } + + @Override + public long onUpdate(FlowStage ctx, long now) { + float capacity = this.capacity; + float demand = this.demand; + float rate = demand; + + if (demand > capacity) { + rate = redistributeCapacity(inlets, inputs, rates, capacity); + } + + if (this.rate != rate) { + // Only update the outputs if the output rate has changed + this.rate = rate; + + changeRate(activeOutputs, outlets, capacity, rate); + } + + return Long.MAX_VALUE; + } + + @Override + public int getInputCount() { + return activeInputs.length(); + } + + @Override + public Inlet newInput() { + final BitSet activeInputs = this.activeInputs; + int slot = activeInputs.nextClearBit(0); + + InPort port = stage.getInlet("in" + slot); + port.setHandler(inHandler); + port.pull(this.capacity); + + InPort[] inlets = this.inlets; + if (slot >= inlets.length) { + int newLength = inlets.length + (inlets.length >> 1); + inlets = Arrays.copyOf(inlets, newLength); + inputs = Arrays.copyOf(inputs, newLength); + rates = Arrays.copyOf(rates, newLength); + this.inlets = inlets; + } + inlets[slot] = port; + + activeInputs.set(slot); + return port; + } + + @Override + public void releaseInput(Inlet inlet) { + InPort port = (InPort) inlet; + + activeInputs.clear(port.getId()); + port.cancel(null); + } + + @Override + public int getOutputCount() { + return activeOutputs.length(); + } + + @Override + public Outlet newOutput() { + final BitSet activeOutputs = this.activeOutputs; + int slot = activeOutputs.nextClearBit(0); + + OutPort port = stage.getOutlet("out" + slot); + port.setHandler(outHandler); + + OutPort[] outlets = this.outlets; + if (slot >= outlets.length) { + int newLength = outlets.length + (outlets.length >> 1); + outlets = Arrays.copyOf(outlets, newLength); + this.outlets = outlets; + } + outlets[slot] = port; + + activeOutputs.set(slot); + return port; + } + + @Override + public void releaseOutput(Outlet outlet) { + OutPort port = (OutPort) outlet; + activeInputs.clear(port.getId()); + port.complete(); + } + + /** + * Helper function to redistribute the specified capacity across the inlets. + */ + private static float redistributeCapacity(InPort[] inlets, long[] inputs, float[] rates, float capacity) { + // If the demand is higher than the capacity, we need use max-min fair sharing to distribute the + // constrained capacity across the inputs. + for (int i = 0; i < inputs.length; i++) { + InPort inlet = inlets[i]; + if (inlet == null) { + break; + } + + inputs[i] = ((long) Float.floatToRawIntBits(inlet.getDemand()) << 32) | (i & 0xFFFFFFFFL); + } + Arrays.sort(inputs); + + float availableCapacity = capacity; + int inputSize = inputs.length; + + // Divide the available output capacity fairly over the inputs using max-min fair sharing + for (int i = 0; i < inputs.length; i++) { + long v = inputs[i]; + int slot = (int) v; + float d = Float.intBitsToFloat((int) (v >> 32)); + + if (d == 0.0) { + continue; + } + + float availableShare = availableCapacity / (inputSize - i); + float r = Math.min(d, availableShare); + + rates[slot] = r; + availableCapacity -= r; + } + + return capacity - availableCapacity; + } + + /** + * Helper method to change the rate of the outlets. + */ + private static void changeRate(BitSet activeOutputs, OutPort[] outlets, float capacity, float rate) { + // Divide the requests over the available capacity of the input resources fairly + for (int i = activeOutputs.nextSetBit(0); i != -1; i = activeOutputs.nextSetBit(i + 1)) { + OutPort outlet = outlets[i]; + float fraction = outlet.getCapacity() / capacity; + outlet.push(rate * fraction); + } + } + + /** + * A {@link InHandler} implementation for the multiplexer inputs. + */ + private class MultiplexerInHandler implements InHandler { + @Override + public float getRate(InPort port) { + return rates[port.getId()]; + } + + @Override + public void onPush(InPort port, float demand) { + MaxMinFlowMultiplexer.this.demand += -port.getDemand() + demand; + rates[port.getId()] = demand; + } + + @Override + public void onUpstreamFinish(InPort port, Throwable cause) { + MaxMinFlowMultiplexer.this.demand -= port.getDemand(); + releaseInput(port); + rates[port.getId()] = 0.f; + } + } + + /** + * A {@link OutHandler} implementation for the multiplexer outputs. + */ + private class MultiplexerOutHandler implements OutHandler { + @Override + public void onPull(OutPort port, float capacity) { + float newCapacity = MaxMinFlowMultiplexer.this.capacity - port.getCapacity() + capacity; + MaxMinFlowMultiplexer.this.capacity = newCapacity; + changeInletCapacity(newCapacity); + } + + @Override + public void onDownstreamFinish(OutPort port, Throwable cause) { + float newCapacity = MaxMinFlowMultiplexer.this.capacity - port.getCapacity(); + MaxMinFlowMultiplexer.this.capacity = newCapacity; + releaseOutput(port); + changeInletCapacity(newCapacity); + } + + private void changeInletCapacity(float capacity) { + BitSet activeInputs = MaxMinFlowMultiplexer.this.activeInputs; + InPort[] inlets = MaxMinFlowMultiplexer.this.inlets; + + for (int i = activeInputs.nextSetBit(0); i != -1; i = activeInputs.nextSetBit(i + 1)) { + inlets[i].pull(capacity); + } + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/FlowSink.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/FlowSink.java new file mode 100644 index 00000000..69c94708 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/FlowSink.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.sink; + +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.Inlet; + +/** + * A {@link FlowStage} with a single input. + */ +public interface FlowSink { + /** + * Return the input of this {@link FlowSink}. + */ + Inlet getInput(); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/SimpleFlowSink.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/SimpleFlowSink.java new file mode 100644 index 00000000..fdfe5ee8 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/SimpleFlowSink.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.sink; + +import org.opendc.simulator.flow2.FlowGraph; +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.FlowStageLogic; +import org.opendc.simulator.flow2.InHandler; +import org.opendc.simulator.flow2.InPort; +import org.opendc.simulator.flow2.Inlet; + +/** + * A sink with a fixed capacity. + */ +public final class SimpleFlowSink implements FlowSink, FlowStageLogic { + private final FlowStage stage; + private final InPort input; + private final Handler handler; + + /** + * Construct a new {@link SimpleFlowSink} with the specified initial capacity. + * + * @param graph The graph to add the sink to. + * @param initialCapacity The initial capacity of the sink. + */ + public SimpleFlowSink(FlowGraph graph, float initialCapacity) { + this.stage = graph.newStage(this); + this.handler = new Handler(); + this.input = stage.getInlet("in"); + this.input.pull(initialCapacity); + this.input.setMask(true); + this.input.setHandler(handler); + } + + /** + * Return the {@link Inlet} of this sink. + */ + @Override + public Inlet getInput() { + return input; + } + + /** + * Return the capacity of the sink. + */ + public float getCapacity() { + return input.getCapacity(); + } + + /** + * Update the capacity of the sink. + * + * @param capacity The new capacity to update the sink to. + */ + public void setCapacity(float capacity) { + input.pull(capacity); + stage.invalidate(); + } + + /** + * Return the flow rate of the sink. + */ + public float getRate() { + return input.getRate(); + } + + /** + * Remove this node from the graph. + */ + public void close() { + stage.close(); + } + + @Override + public long onUpdate(FlowStage ctx, long now) { + InPort input = this.input; + handler.rate = Math.min(input.getDemand(), input.getCapacity()); + return Long.MAX_VALUE; + } + + /** + * The {@link InHandler} implementation for the sink. + */ + private static final class Handler implements InHandler { + float rate; + + @Override + public float getRate(InPort port) { + return rate; + } + + @Override + public void onPush(InPort port, float demand) { + float capacity = port.getCapacity(); + rate = Math.min(demand, capacity); + } + + @Override + public void onUpstreamFinish(InPort port, Throwable cause) { + rate = 0.f; + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/EmptyFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/EmptyFlowSource.java new file mode 100644 index 00000000..2dcc66e4 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/EmptyFlowSource.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.source; + +import org.opendc.simulator.flow2.FlowGraph; +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.FlowStageLogic; +import org.opendc.simulator.flow2.OutPort; +import org.opendc.simulator.flow2.Outlet; + +/** + * An empty {@link FlowSource}. + */ +public final class EmptyFlowSource implements FlowSource, FlowStageLogic { + private final FlowStage stage; + private final OutPort output; + + /** + * Construct a new {@link EmptyFlowSource}. + */ + public EmptyFlowSource(FlowGraph graph) { + this.stage = graph.newStage(this); + this.output = stage.getOutlet("out"); + } + + /** + * Return the {@link Outlet} of the source. + */ + @Override + public Outlet getOutput() { + return output; + } + + /** + * Remove this node from the graph. + */ + public void close() { + stage.close(); + } + + @Override + public long onUpdate(FlowStage ctx, long now) { + return Long.MAX_VALUE; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/FlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/FlowSource.java new file mode 100644 index 00000000..f9432c33 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/FlowSource.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.source; + +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.Outlet; + +/** + * A {@link FlowStage} with a single output. + */ +public interface FlowSource { + /** + * Return the output of this {@link FlowSource}. + */ + Outlet getOutput(); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java new file mode 100644 index 00000000..a237c81e --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.source; + +import org.opendc.simulator.flow2.FlowGraph; +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.FlowStageLogic; +import org.opendc.simulator.flow2.OutHandler; +import org.opendc.simulator.flow2.OutPort; +import org.opendc.simulator.flow2.Outlet; + +import java.util.function.Consumer; + +/** + * A {@link FlowSource} that ensures a flow is emitted for a specified amount of time at some utilization. + */ +public class RuntimeFlowSource implements FlowSource, FlowStageLogic { + private final float utilization; + + private final FlowStage stage; + private final OutPort output; + private final Consumer completionHandler; + + private long duration; + private long lastPull; + + /** + * Construct a {@link RuntimeFlowSource} instance. + * + * @param graph The {@link FlowGraph} to which this source belongs. + * @param duration The duration of the source. + * @param utilization The utilization of the capacity of the outlet. + * @param completionHandler A callback invoked when the source completes. + */ + public RuntimeFlowSource( + FlowGraph graph, long duration, float utilization, Consumer completionHandler) { + if (duration <= 0) { + throw new IllegalArgumentException("Duration must be positive and non-zero"); + } + + if (utilization <= 0.0) { + throw new IllegalArgumentException("Utilization must be positive and non-zero"); + } + + this.stage = graph.newStage(this); + this.output = stage.getOutlet("out"); + this.output.setHandler(new OutHandler() { + @Override + public void onPull(OutPort port, float capacity) {} + + @Override + public void onDownstreamFinish(OutPort port, Throwable cause) { + // Source cannot complete without re-connecting to another sink, so mark the source as completed + completionHandler.accept(RuntimeFlowSource.this); + } + }); + this.duration = duration; + this.utilization = utilization; + this.completionHandler = completionHandler; + this.lastPull = graph.getEngine().getClock().millis(); + } + + /** + * Construct a new {@link RuntimeFlowSource}. + * + * @param graph The {@link FlowGraph} to which this source belongs. + * @param duration The duration of the source. + * @param utilization The utilization of the capacity of the outlet. + */ + public RuntimeFlowSource(FlowGraph graph, long duration, float utilization) { + this(graph, duration, utilization, RuntimeFlowSource::close); + } + + /** + * Return the {@link Outlet} of the source. + */ + @Override + public Outlet getOutput() { + return output; + } + + /** + * Remove this node from the graph. + */ + public void close() { + stage.close(); + } + + @Override + public long onUpdate(FlowStage ctx, long now) { + long lastPull = this.lastPull; + this.lastPull = now; + + long delta = Math.max(0, now - lastPull); + + OutPort output = this.output; + float limit = output.getCapacity() * utilization; + long duration = this.duration - delta; + + if (duration <= 0) { + completionHandler.accept(this); + return Long.MAX_VALUE; + } + + this.duration = duration; + output.push(limit); + return now + duration; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java new file mode 100644 index 00000000..764a20a8 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.source; + +import org.opendc.simulator.flow2.FlowGraph; +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.FlowStageLogic; +import org.opendc.simulator.flow2.OutHandler; +import org.opendc.simulator.flow2.OutPort; +import org.opendc.simulator.flow2.Outlet; + +import java.util.function.Consumer; + +/** + * A flow source that contains a fixed amount and is pushed with a given utilization. + */ +public final class SimpleFlowSource implements FlowSource, FlowStageLogic { + private final float utilization; + private float remainingAmount; + private long lastPull; + + private final FlowStage stage; + private final OutPort output; + private final Consumer completionHandler; + + /** + * Construct a new {@link SimpleFlowSource}. + * + * @param graph The {@link FlowGraph} to which this source belongs. + * @param amount The amount to transfer via the outlet. + * @param utilization The utilization of the capacity of the outlet. + * @param completionHandler A callback invoked when the source completes. + */ + public SimpleFlowSource( + FlowGraph graph, float amount, float utilization, Consumer completionHandler) { + if (amount < 0.0) { + throw new IllegalArgumentException("Amount must be non-negative"); + } + + if (utilization <= 0.0) { + throw new IllegalArgumentException("Utilization must be positive and non-zero"); + } + + this.stage = graph.newStage(this); + this.output = stage.getOutlet("out"); + this.output.setHandler(new OutHandler() { + @Override + public void onPull(OutPort port, float capacity) {} + + @Override + public void onDownstreamFinish(OutPort port, Throwable cause) { + // Source cannot complete without re-connecting to another sink, so mark the source as completed + completionHandler.accept(SimpleFlowSource.this); + } + }); + this.completionHandler = completionHandler; + this.utilization = utilization; + this.remainingAmount = amount; + this.lastPull = graph.getEngine().getClock().millis(); + } + + /** + * Construct a new {@link SimpleFlowSource}. + * + * @param graph The {@link FlowGraph} to which this source belongs. + * @param amount The amount to transfer via the outlet. + * @param utilization The utilization of the capacity of the outlet. + */ + public SimpleFlowSource(FlowGraph graph, float amount, float utilization) { + this(graph, amount, utilization, SimpleFlowSource::close); + } + + /** + * Return the {@link Outlet} of the source. + */ + @Override + public Outlet getOutput() { + return output; + } + + /** + * Remove this node from the graph. + */ + public void close() { + stage.close(); + } + + @Override + public long onUpdate(FlowStage ctx, long now) { + long lastPull = this.lastPull; + this.lastPull = now; + + long delta = Math.max(0, now - lastPull); + + OutPort output = this.output; + float consumed = output.getRate() * delta / 1000.f; + float limit = output.getCapacity() * utilization; + + float remainingAmount = this.remainingAmount - consumed; + this.remainingAmount = remainingAmount; + + long duration = (long) Math.ceil(remainingAmount / limit * 1000); + + if (duration <= 0) { + completionHandler.accept(this); + return Long.MAX_VALUE; + } + + output.push(limit); + return now + duration; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java new file mode 100644 index 00000000..96d43aef --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java @@ -0,0 +1,152 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.source; + +import org.opendc.simulator.flow2.FlowGraph; +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.FlowStageLogic; +import org.opendc.simulator.flow2.OutHandler; +import org.opendc.simulator.flow2.OutPort; +import org.opendc.simulator.flow2.Outlet; + +import java.util.function.Consumer; + +/** + * A flow source that replays a sequence of fragments, each indicating the flow rate for some period of time. + */ +public final class TraceFlowSource implements FlowSource, FlowStageLogic { + private final OutPort output; + private final long[] deadlines; + private final float[] usages; + private final int size; + private int index; + + private final FlowStage stage; + private final Consumer completionHandler; + + /** + * Construct a {@link TraceFlowSource}. + * + * @param graph The {@link FlowGraph} to which the source belongs. + * @param trace The {@link Trace} to replay. + * @param completionHandler The completion handler to invoke when the source finishes. + */ + public TraceFlowSource(FlowGraph graph, Trace trace, Consumer completionHandler) { + this.stage = graph.newStage(this); + this.output = stage.getOutlet("out"); + this.output.setHandler(new OutHandler() { + @Override + public void onPull(OutPort port, float capacity) {} + + @Override + public void onDownstreamFinish(OutPort port, Throwable cause) { + // Source cannot complete without re-connecting to another sink, so mark the source as completed + completionHandler.accept(TraceFlowSource.this); + } + }); + this.deadlines = trace.deadlines; + this.usages = trace.usages; + this.size = trace.size; + this.completionHandler = completionHandler; + } + + /** + * Construct a {@link TraceFlowSource}. + * + * @param graph The {@link FlowGraph} to which the source belongs. + * @param trace The {@link Trace} to replay. + */ + public TraceFlowSource(FlowGraph graph, Trace trace) { + this(graph, trace, TraceFlowSource::close); + } + + @Override + public Outlet getOutput() { + return output; + } + + /** + * Remove this node from the graph. + */ + public void close() { + stage.close(); + } + + @Override + public long onUpdate(FlowStage ctx, long now) { + int size = this.size; + int index = this.index; + long[] deadlines = this.deadlines; + long deadline; + + do { + deadline = deadlines[index]; + } while (deadline <= now && ++index < size); + + if (index >= size) { + output.push(0.0f); + completionHandler.accept(this); + return Long.MAX_VALUE; + } + + this.index = index; + float usage = usages[index]; + output.push(usage); + + return deadline; + } + + /** + * A trace describes the workload over time. + */ + public static final class Trace { + private final long[] deadlines; + private final float[] usages; + private final int size; + + /** + * Construct a {@link Trace}. + * + * @param deadlines The deadlines of the trace fragments. + * @param usages The usages of the trace fragments. + * @param size The size of the trace. + */ + public Trace(long[] deadlines, float[] usages, int size) { + this.deadlines = deadlines; + this.usages = usages; + this.size = size; + } + + public long[] getDeadlines() { + return deadlines; + } + + public float[] getUsages() { + return usages; + } + + public int getSize() { + return size; + } + } +} -- cgit v1.2.3