summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-08-21 14:27:41 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-21 22:13:03 +0200
commitad88144923d76dfc421f0b22a0b4e670b3f6366e (patch)
treed1721cfc33dd76a0eb13c0c00f8a3320f7652863 /opendc-simulator/opendc-simulator-flow/src
parenta832ea376e360f3029036a9570c244fb9080e91f (diff)
perf(sim/flow): Add support for multi-flow stages
This change adds support for creating nodes in a flow graph that support multiple inputs and outputs directly, instead of our current approach where we need to re-implement the `FlowConsumerContext` interface in order to support multiple inputs or outputs.
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt108
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java260
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraph.java63
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraphInternal.java93
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java303
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageLogic.java38
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageQueue.java109
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowTimerQueue.java208
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandler.java54
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandlers.java53
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java214
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Inlet.java38
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InvocationStack.java95
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandler.java47
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandlers.java53
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java224
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Outlet.java38
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java70
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java268
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/FlowSink.java36
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/SimpleFlowSink.java123
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/EmptyFlowSource.java65
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/FlowSource.java36
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java129
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java132
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java152
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt197
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowTimerQueueTest.kt385
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/InvocationStackTest.kt71
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt54
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt124
32 files changed, 3841 insertions, 1 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
index 58f84d82..9e0a4a5e 100644
--- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
@@ -47,7 +47,7 @@ class FlowBenchmarks {
@Setup
fun setUp() {
val random = ThreadLocalRandom.current()
- val entries = List(10000) { TraceFlowSource.Fragment(1000, random.nextDouble(0.0, 4500.0)) }
+ val entries = List(1000000) { TraceFlowSource.Fragment(1000, random.nextDouble(0.0, 4500.0)) }
trace = entries.asSequence()
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt
new file mode 100644
index 00000000..1b0e2e9e
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt
@@ -0,0 +1,108 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2
+
+import kotlinx.coroutines.launch
+import org.opendc.simulator.flow2.mux.MaxMinFlowMultiplexer
+import org.opendc.simulator.flow2.sink.SimpleFlowSink
+import org.opendc.simulator.flow2.source.TraceFlowSource
+import org.opendc.simulator.kotlin.runSimulation
+import org.openjdk.jmh.annotations.Benchmark
+import org.openjdk.jmh.annotations.Fork
+import org.openjdk.jmh.annotations.Measurement
+import org.openjdk.jmh.annotations.Scope
+import org.openjdk.jmh.annotations.Setup
+import org.openjdk.jmh.annotations.State
+import org.openjdk.jmh.annotations.Warmup
+import java.util.concurrent.ThreadLocalRandom
+import java.util.concurrent.TimeUnit
+
+@State(Scope.Thread)
+@Fork(1)
+@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
+class FlowBenchmarks {
+ private lateinit var trace: TraceFlowSource.Trace
+
+ @Setup
+ fun setUp() {
+ val random = ThreadLocalRandom.current()
+ val traceSize = 10_000_000
+ trace = TraceFlowSource.Trace(
+ LongArray(traceSize) { (it + 1) * 1000L },
+ FloatArray(traceSize) { random.nextFloat(0.0f, 4500.0f) },
+ traceSize
+ )
+ }
+
+ @Benchmark
+ fun benchmarkSink() {
+ return runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+ val sink = SimpleFlowSink(graph, 4200.0f)
+ val source = TraceFlowSource(graph, trace)
+ graph.connect(source.output, sink.input)
+ }
+ }
+
+ @Benchmark
+ fun benchmarkMuxMaxMinSingleSource() {
+ return runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+ val switch = MaxMinFlowMultiplexer(graph)
+
+ val sinkA = SimpleFlowSink(graph, 3000.0f)
+ val sinkB = SimpleFlowSink(graph, 3000.0f)
+
+ graph.connect(switch.newOutput(), sinkA.input)
+ graph.connect(switch.newOutput(), sinkB.input)
+
+ val source = TraceFlowSource(graph, trace)
+ graph.connect(source.output, switch.newInput())
+ }
+ }
+
+ @Benchmark
+ fun benchmarkMuxMaxMinTripleSource() {
+ return runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+ val switch = MaxMinFlowMultiplexer(graph)
+
+ val sinkA = SimpleFlowSink(graph, 3000.0f)
+ val sinkB = SimpleFlowSink(graph, 3000.0f)
+
+ graph.connect(switch.newOutput(), sinkA.input)
+ graph.connect(switch.newOutput(), sinkB.input)
+
+ repeat(3) {
+ launch {
+ val source = TraceFlowSource(graph, trace)
+ graph.connect(source.output, switch.newInput())
+ }
+ }
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java
new file mode 100644
index 00000000..0ebb0da9
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java
@@ -0,0 +1,260 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.List;
+import kotlin.coroutines.ContinuationInterceptor;
+import kotlin.coroutines.CoroutineContext;
+import kotlinx.coroutines.Delay;
+
+/**
+ * A {@link FlowEngine} simulates a generic flow network.
+ * <p>
+ * The engine centralizes the scheduling logic of state updates of flow connections, allowing update propagation
+ * to happen more efficiently. and overall, reducing the work necessary to transition into a steady state.
+ */
+public final class FlowEngine implements Runnable {
+ /**
+ * The queue of {@link FlowStage} updates that are scheduled for immediate execution.
+ */
+ private final FlowStageQueue queue = new FlowStageQueue(256);
+
+ /**
+ * A priority queue containing the {@link FlowStage} updates to be scheduled in the future.
+ */
+ private final FlowTimerQueue timerQueue = new FlowTimerQueue(256);
+
+ /**
+ * The stack of engine invocations to occur in the future.
+ */
+ private final InvocationStack futureInvocations = new InvocationStack(256);
+
+ /**
+ * A flag to indicate that the engine is active.
+ */
+ private boolean active;
+
+ private final CoroutineContext coroutineContext;
+ private final Clock clock;
+ private final Delay delay;
+
+ /**
+ * Create a new {@link FlowEngine} instance using the specified {@link CoroutineContext} and {@link Clock}.
+ */
+ public static FlowEngine create(CoroutineContext coroutineContext, Clock clock) {
+ return new FlowEngine(coroutineContext, clock);
+ }
+
+ FlowEngine(CoroutineContext coroutineContext, Clock clock) {
+ this.coroutineContext = coroutineContext;
+ this.clock = clock;
+
+ CoroutineContext.Key<? extends ContinuationInterceptor> key = ContinuationInterceptor.Key;
+ this.delay = (Delay) coroutineContext.get(key);
+ }
+
+ /**
+ * Obtain the (virtual) {@link Clock} driving the simulation.
+ */
+ public Clock getClock() {
+ return clock;
+ }
+
+ /**
+ * Return a new {@link FlowGraph} that can be used to build a flow network.
+ */
+ public FlowGraph newGraph() {
+ return new RootGraph(this);
+ }
+
+ /**
+ * Enqueue the specified {@link FlowStage} to be updated immediately during the active engine cycle.
+ * <p>
+ * This method should be used when the state of a flow context is invalidated/interrupted and needs to be
+ * re-computed.
+ */
+ void scheduleImmediate(long now, FlowStage ctx) {
+ scheduleImmediateInContext(ctx);
+
+ // In-case the engine is already running in the call-stack, return immediately. The changes will be picked
+ // up by the active engine.
+ if (active) {
+ return;
+ }
+
+ trySchedule(futureInvocations, now, now);
+ }
+
+ /**
+ * Enqueue the specified {@link FlowStage} to be updated immediately during the active engine cycle.
+ * <p>
+ * This method should be used when the state of a flow context is invalidated/interrupted and needs to be
+ * re-computed.
+ * <p>
+ * This method should only be invoked while inside an engine cycle.
+ */
+ void scheduleImmediateInContext(FlowStage ctx) {
+ queue.add(ctx);
+ }
+
+ /**
+ * Enqueue the specified {@link FlowStage} to be updated at its updated deadline.
+ */
+ void scheduleDelayed(FlowStage ctx) {
+ scheduleDelayedInContext(ctx);
+
+ // In-case the engine is already running in the call-stack, return immediately. The changes will be picked
+ // up by the active engine.
+ if (active) {
+ return;
+ }
+
+ long deadline = timerQueue.peekDeadline();
+ if (deadline != Long.MAX_VALUE) {
+ trySchedule(futureInvocations, clock.millis(), deadline);
+ }
+ }
+
+ /**
+ * Enqueue the specified {@link FlowStage} to be updated at its updated deadline.
+ * <p>
+ * This method should only be invoked while inside an engine cycle.
+ */
+ void scheduleDelayedInContext(FlowStage ctx) {
+ FlowTimerQueue timerQueue = this.timerQueue;
+ timerQueue.enqueue(ctx);
+ }
+
+ /**
+ * Run all the enqueued actions for the specified timestamp (<code>now</code>).
+ */
+ private void doRunEngine(long now) {
+ final FlowStageQueue queue = this.queue;
+ final FlowTimerQueue timerQueue = this.timerQueue;
+
+ try {
+ // Mark the engine as active to prevent concurrent calls to this method
+ active = true;
+
+ // Execute all scheduled updates at current timestamp
+ while (true) {
+ final FlowStage ctx = timerQueue.poll(now);
+ if (ctx == null) {
+ break;
+ }
+
+ ctx.onUpdate(now);
+ }
+
+ // Execute all immediate updates
+ while (true) {
+ final FlowStage ctx = queue.poll();
+ if (ctx == null) {
+ break;
+ }
+
+ ctx.onUpdate(now);
+ }
+ } finally {
+ active = false;
+ }
+
+ // Schedule an engine invocation for the next update to occur.
+ long headDeadline = timerQueue.peekDeadline();
+ if (headDeadline != Long.MAX_VALUE && headDeadline >= now) {
+ trySchedule(futureInvocations, now, headDeadline);
+ }
+ }
+
+ @Override
+ public void run() {
+ doRunEngine(futureInvocations.poll());
+ }
+
+ /**
+ * Try to schedule an engine invocation at the specified [target].
+ *
+ * @param scheduled The queue of scheduled invocations.
+ * @param now The current virtual timestamp.
+ * @param target The virtual timestamp at which the engine invocation should happen.
+ */
+ private void trySchedule(InvocationStack scheduled, long now, long target) {
+ // Only schedule a new scheduler invocation in case the target is earlier than all other pending
+ // scheduler invocations
+ if (scheduled.tryAdd(target)) {
+ delay.invokeOnTimeout(target - now, this, coroutineContext);
+ }
+ }
+
+ /**
+ * Internal implementation of a root {@link FlowGraph}.
+ */
+ private static final class RootGraph implements FlowGraphInternal {
+ private final FlowEngine engine;
+ private final List<FlowStage> stages = new ArrayList<>();
+
+ public RootGraph(FlowEngine engine) {
+ this.engine = engine;
+ }
+
+ @Override
+ public FlowEngine getEngine() {
+ return engine;
+ }
+
+ @Override
+ public FlowStage newStage(FlowStageLogic logic) {
+ final FlowEngine engine = this.engine;
+ final FlowStage stage = new FlowStage(this, logic);
+ stages.add(stage);
+ long now = engine.getClock().millis();
+ stage.invalidate(now);
+ return stage;
+ }
+
+ @Override
+ public void connect(Outlet outlet, Inlet inlet) {
+ FlowGraphInternal.connect(this, outlet, inlet);
+ }
+
+ @Override
+ public void disconnect(Outlet outlet) {
+ FlowGraphInternal.disconnect(this, outlet);
+ }
+
+ @Override
+ public void disconnect(Inlet inlet) {
+ FlowGraphInternal.disconnect(this, inlet);
+ }
+
+ /**
+ * Internal method to remove the specified {@link FlowStage} from the graph.
+ */
+ @Override
+ public void detach(FlowStage stage) {
+ stages.remove(stage);
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraph.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraph.java
new file mode 100644
index 00000000..f45be6cd
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraph.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+/**
+ * A representation of a flow network. A flow network is a directed graph where each edge has a capacity and receives an
+ * amount of flow that cannot exceed the edge's capacity.
+ */
+public interface FlowGraph {
+ /**
+ * Return the {@link FlowEngine} driving the simulation of the graph.
+ */
+ FlowEngine getEngine();
+
+ /**
+ * Create a new {@link FlowStage} representing a node in the flow network.
+ *
+ * @param logic The logic for handling the events of the stage.
+ */
+ FlowStage newStage(FlowStageLogic logic);
+
+ /**
+ * Add an edge between the specified outlet port and inlet port in this graph.
+ *
+ * @param outlet The outlet of the source from which the flow originates.
+ * @param inlet The inlet of the sink that should receive the flow.
+ */
+ void connect(Outlet outlet, Inlet inlet);
+
+ /**
+ * Disconnect the specified {@link Outlet} (if connected).
+ *
+ * @param outlet The outlet to disconnect.
+ */
+ void disconnect(Outlet outlet);
+
+ /**
+ * Disconnect the specified {@link Inlet} (if connected).
+ *
+ * @param inlet The inlet to disconnect.
+ */
+ void disconnect(Inlet inlet);
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraphInternal.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraphInternal.java
new file mode 100644
index 00000000..0f608b60
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraphInternal.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+/**
+ * Interface implemented by {@link FlowGraph} implementations.
+ */
+interface FlowGraphInternal extends FlowGraph {
+ /**
+ * Internal method to remove the specified {@link FlowStage} from the graph.
+ */
+ void detach(FlowStage stage);
+
+ /**
+ * Helper method to connect an outlet to an inlet.
+ */
+ static void connect(FlowGraph graph, Outlet outlet, Inlet inlet) {
+ if (!(outlet instanceof OutPort) || !(inlet instanceof InPort)) {
+ throw new IllegalArgumentException("Invalid outlet or inlet passed to graph");
+ }
+
+ InPort inPort = (InPort) inlet;
+ OutPort outPort = (OutPort) outlet;
+
+ if (!graph.equals(outPort.getGraph()) || !graph.equals(inPort.getGraph())) {
+ throw new IllegalArgumentException("Outlet or inlet does not belong to graph");
+ } else if (outPort.input != null || inPort.output != null) {
+ throw new IllegalStateException("Inlet or outlet already connected");
+ }
+
+ outPort.input = inPort;
+ inPort.output = outPort;
+
+ inPort.connect();
+ outPort.connect();
+ }
+
+ /**
+ * Helper method to disconnect an outlet.
+ */
+ static void disconnect(FlowGraph graph, Outlet outlet) {
+ if (!(outlet instanceof OutPort)) {
+ throw new IllegalArgumentException("Invalid outlet passed to graph");
+ }
+
+ OutPort outPort = (OutPort) outlet;
+
+ if (!graph.equals(outPort.getGraph())) {
+ throw new IllegalArgumentException("Outlet or inlet does not belong to graph");
+ }
+
+ outPort.cancel(null);
+ outPort.complete();
+ }
+
+ /**
+ * Helper method to disconnect an inlet.
+ */
+ static void disconnect(FlowGraph graph, Inlet inlet) {
+ if (!(inlet instanceof InPort)) {
+ throw new IllegalArgumentException("Invalid outlet passed to graph");
+ }
+
+ InPort inPort = (InPort) inlet;
+
+ if (!graph.equals(inPort.getGraph())) {
+ throw new IllegalArgumentException("Outlet or inlet does not belong to graph");
+ }
+
+ inPort.finish(null);
+ inPort.cancel(null);
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java
new file mode 100644
index 00000000..4d098043
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java
@@ -0,0 +1,303 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+import java.time.Clock;
+import java.util.HashMap;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link FlowStage} represents a node in a {@link FlowGraph}.
+ */
+public final class FlowStage {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlowStage.class);
+
+ /**
+ * States of the flow stage.
+ */
+ private static final int STAGE_PENDING = 0; // Stage is pending to be started
+
+ private static final int STAGE_ACTIVE = 1; // Stage is actively running
+ private static final int STAGE_CLOSED = 2; // Stage is closed
+ private static final int STAGE_STATE = 0b11; // Mask for accessing the state of the flow stage
+
+ /**
+ * Flags of the flow connection
+ */
+ private static final int STAGE_INVALIDATE = 1 << 2; // The stage is invalidated
+
+ private static final int STAGE_CLOSE = 1 << 3; // The stage should be closed
+ private static final int STAGE_UPDATE_ACTIVE = 1 << 4; // An update for the connection is active
+ private static final int STAGE_UPDATE_PENDING = 1 << 5; // An (immediate) update of the connection is pending
+
+ /**
+ * The flags representing the state and pending actions for the stage.
+ */
+ private int flags = STAGE_PENDING;
+
+ /**
+ * The deadline of the stage after which an update should run.
+ */
+ long deadline = Long.MAX_VALUE;
+
+ /**
+ * The index of the timer in the {@link FlowTimerQueue}.
+ */
+ int timerIndex = -1;
+
+ final Clock clock;
+ private final FlowStageLogic logic;
+ final FlowGraphInternal parentGraph;
+ private final FlowEngine engine;
+
+ private final Map<String, InPort> inlets = new HashMap<>();
+ private final Map<String, OutPort> outlets = new HashMap<>();
+ private int nextInlet = 0;
+ private int nextOutlet = 0;
+
+ /**
+ * Construct a new {@link FlowStage} instance.
+ *
+ * @param parentGraph The {@link FlowGraph} this stage belongs to.
+ * @param logic The logic of the stage.
+ */
+ FlowStage(FlowGraphInternal parentGraph, FlowStageLogic logic) {
+ this.parentGraph = parentGraph;
+ this.logic = logic;
+ this.engine = parentGraph.getEngine();
+ this.clock = engine.getClock();
+ }
+
+ /**
+ * Return the {@link FlowGraph} to which this stage belongs.
+ */
+ public FlowGraph getGraph() {
+ return parentGraph;
+ }
+
+ /**
+ * Return the {@link Inlet} (an in-going edge) with the specified <code>name</code> for this {@link FlowStage}.
+ * If an inlet with that name does not exist, a new one is allocated for the stage.
+ *
+ * @param name The name of the inlet.
+ * @return The {@link InPort} representing an {@link Inlet} with the specified <code>name</code>.
+ */
+ public InPort getInlet(String name) {
+ return inlets.computeIfAbsent(name, (key) -> new InPort(this, key, nextInlet++));
+ }
+
+ /**
+ * Return the {@link Outlet} (an out-going edge) with the specified <code>name</code> for this {@link FlowStage}.
+ * If an outlet with that name does not exist, a new one is allocated for the stage.
+ *
+ * @param name The name of the outlet.
+ * @return The {@link OutPort} representing an {@link Outlet} with the specified <code>name</code>.
+ */
+ public OutPort getOutlet(String name) {
+ return outlets.computeIfAbsent(name, (key) -> new OutPort(this, key, nextOutlet++));
+ }
+
+ /**
+ * Return the current deadline of the {@link FlowStage}'s timer (in milliseconds after epoch).
+ */
+ public long getDeadline() {
+ return deadline;
+ }
+
+ /**
+ * Set the deadline of the {@link FlowStage}'s timer.
+ *
+ * @param deadline The new deadline (in milliseconds after epoch) when the stage should be interrupted.
+ */
+ public void setDeadline(long deadline) {
+ this.deadline = deadline;
+
+ if ((flags & STAGE_UPDATE_ACTIVE) == 0) {
+ // Update the timer queue with the new deadline
+ engine.scheduleDelayed(this);
+ }
+ }
+
+ /**
+ * Invalidate the {@link FlowStage} forcing the stage to update.
+ */
+ public void invalidate() {
+ int flags = this.flags;
+
+ if ((flags & STAGE_UPDATE_ACTIVE) == 0) {
+ scheduleImmediate(clock.millis(), flags | STAGE_INVALIDATE);
+ }
+ }
+
+ /**
+ * Close the {@link FlowStage} and disconnect all inlets and outlets.
+ */
+ public void close() {
+ int flags = this.flags;
+
+ if ((flags & STAGE_STATE) == STAGE_CLOSED) {
+ return;
+ }
+
+ // Toggle the close bit. In case no update is active, schedule a new update.
+ if ((flags & STAGE_UPDATE_ACTIVE) != 0) {
+ this.flags = flags | STAGE_CLOSE;
+ } else {
+ scheduleImmediate(clock.millis(), flags | STAGE_CLOSE);
+ }
+ }
+
+ /**
+ * Update the state of the flow stage.
+ *
+ * @param now The current virtual timestamp.
+ */
+ void onUpdate(long now) {
+ int flags = this.flags;
+ int state = flags & STAGE_STATE;
+
+ if (state == STAGE_ACTIVE) {
+ doUpdate(now, flags);
+ } else if (state == STAGE_PENDING) {
+ doStart(now, flags);
+ }
+ }
+
+ /**
+ * Invalidate the {@link FlowStage} forcing the stage to update.
+ *
+ * <p>
+ * This method is similar to {@link #invalidate()}, but allows the user to manually pass the current timestamp to
+ * prevent having to re-query the clock. This method should not be called during an update.
+ */
+ void invalidate(long now) {
+ scheduleImmediate(now, flags | STAGE_INVALIDATE);
+ }
+
+ /**
+ * Schedule an immediate update for this stage.
+ */
+ private void scheduleImmediate(long now, int flags) {
+ // In case an immediate update is already scheduled, no need to do anything
+ if ((flags & STAGE_UPDATE_PENDING) != 0) {
+ this.flags = flags;
+ return;
+ }
+
+ // Mark the stage that there is an update pending
+ this.flags = flags | STAGE_UPDATE_PENDING;
+
+ engine.scheduleImmediate(now, this);
+ }
+
+ /**
+ * Start the stage.
+ */
+ private void doStart(long now, int flags) {
+ // Update state before calling into the outside world, so it observes a consistent state
+ flags = flags | STAGE_ACTIVE | STAGE_UPDATE_ACTIVE;
+
+ doUpdate(now, flags);
+ }
+
+ /**
+ * Update the state of the stage.
+ */
+ private void doUpdate(long now, int flags) {
+ long deadline = this.deadline;
+ long newDeadline = deadline;
+
+ // Update the stage if:
+ // (1) the timer of the stage has expired.
+ // (2) one of the input ports is pushed,
+ // (3) one of the output ports is pulled,
+ if ((flags & STAGE_INVALIDATE) != 0 || deadline == now) {
+ // Update state before calling into the outside world, so it observes a consistent state
+ this.flags = (flags & ~STAGE_INVALIDATE) | STAGE_UPDATE_ACTIVE;
+
+ try {
+ newDeadline = logic.onUpdate(this, now);
+
+ // IMPORTANT: Re-fetch the flags after the callback might have changed those
+ flags = this.flags;
+ } catch (Exception e) {
+ doFail(e);
+ }
+ }
+
+ // Check whether the stage is marked as closing.
+ if ((flags & STAGE_CLOSE) != 0) {
+ doClose(flags, null);
+
+ // IMPORTANT: Re-fetch the flags after the callback might have changed those
+ flags = this.flags;
+ }
+
+ // Indicate that no update is active anymore and flush the flags
+ this.flags = flags & ~(STAGE_UPDATE_ACTIVE | STAGE_UPDATE_PENDING);
+ this.deadline = newDeadline;
+
+ // Update the timer queue with the new deadline
+ engine.scheduleDelayedInContext(this);
+ }
+
+ /**
+ * This method is invoked when an uncaught exception is caught by the engine. When this happens, the
+ * {@link FlowStageLogic} "fails" and disconnects all its inputs and outputs.
+ */
+ void doFail(Throwable cause) {
+ LOGGER.warn("Uncaught exception (closing stage)", cause);
+
+ doClose(flags, cause);
+ }
+
+ /**
+ * This method is invoked when the {@link FlowStageLogic} exits successfully or due to failure.
+ */
+ private void doClose(int flags, Throwable cause) {
+ // Mark the stage as closed
+ this.flags = flags & ~(STAGE_STATE | STAGE_INVALIDATE | STAGE_CLOSE) | STAGE_CLOSED;
+
+ // Remove stage from parent graph
+ parentGraph.detach(this);
+
+ // Remove stage from the timer queue
+ setDeadline(Long.MAX_VALUE);
+
+ // Cancel all input ports
+ for (InPort port : inlets.values()) {
+ if (port != null) {
+ port.cancel(cause);
+ }
+ }
+
+ // Cancel all output ports
+ for (OutPort port : outlets.values()) {
+ if (port != null) {
+ port.fail(cause);
+ }
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageLogic.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageLogic.java
new file mode 100644
index 00000000..70986a35
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageLogic.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+/**
+ * The {@link FlowStageLogic} interface is responsible for describing the behaviour of a {@link FlowStage} via
+ * out-going flows based on its potential inputs.
+ */
+public interface FlowStageLogic {
+ /**
+ * This method is invoked when the one of the stage's inlets or outlets is invalidated.
+ *
+ * @param ctx The context in which the stage runs.
+ * @param now The virtual timestamp in milliseconds after epoch at which the update is occurring.
+ * @return The next deadline for the stage.
+ */
+ long onUpdate(FlowStage ctx, long now);
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageQueue.java
new file mode 100644
index 00000000..56ec7702
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageQueue.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+
+/**
+ * A specialized {@link ArrayDeque} implementation that contains the {@link FlowStageLogic}s
+ * that have been updated during the engine cycle and should converge.
+ * <p>
+ * By using a specialized class, we reduce the overhead caused by type-erasure.
+ */
+final class FlowStageQueue {
+ /**
+ * The array of elements in the queue.
+ */
+ private FlowStage[] elements;
+
+ private int head = 0;
+ private int tail = 0;
+
+ public FlowStageQueue(int initialCapacity) {
+ elements = new FlowStage[initialCapacity];
+ }
+
+ /**
+ * Add the specified context to the queue.
+ */
+ void add(FlowStage ctx) {
+ final FlowStage[] es = elements;
+ int tail = this.tail;
+
+ es[tail] = ctx;
+
+ tail = inc(tail, es.length);
+ this.tail = tail;
+
+ if (head == tail) {
+ doubleCapacity();
+ }
+ }
+
+ /**
+ * Remove a {@link FlowStage} from the queue or <code>null</code> if the queue is empty.
+ */
+ FlowStage poll() {
+ final FlowStage[] es = elements;
+ int head = this.head;
+ FlowStage ctx = es[head];
+
+ if (ctx != null) {
+ es[head] = null;
+ this.head = inc(head, es.length);
+ }
+
+ return ctx;
+ }
+
+ /**
+ * Doubles the capacity of this deque
+ */
+ private void doubleCapacity() {
+ int oldCapacity = elements.length;
+ int newCapacity = oldCapacity + (oldCapacity >> 1);
+ if (newCapacity < 0) {
+ throw new IllegalStateException("Sorry, deque too big");
+ }
+
+ final FlowStage[] es = elements = Arrays.copyOf(elements, newCapacity);
+
+ // Exceptionally, here tail == head needs to be disambiguated
+ if (tail < head || (tail == head && es[head] != null)) {
+ // wrap around; slide first leg forward to end of array
+ int newSpace = newCapacity - oldCapacity;
+ System.arraycopy(es, head, es, head + newSpace, oldCapacity - head);
+ for (int i = head, to = (head += newSpace); i < to; i++) es[i] = null;
+ }
+ }
+
+ /**
+ * Circularly increments i, mod modulus.
+ * Precondition and postcondition: 0 <= i < modulus.
+ */
+ private static int inc(int i, int modulus) {
+ if (++i >= modulus) i = 0;
+ return i;
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowTimerQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowTimerQueue.java
new file mode 100644
index 00000000..4b746202
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowTimerQueue.java
@@ -0,0 +1,208 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+import java.util.Arrays;
+
+/**
+ * A specialized priority queue for timers of {@link FlowStageLogic}s.
+ * <p>
+ * By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation
+ * being generic.
+ */
+final class FlowTimerQueue {
+ /**
+ * Array representation of binary heap of {@link FlowStage} instances.
+ */
+ private FlowStage[] queue;
+
+ /**
+ * The number of elements in the priority queue.
+ */
+ private int size = 0;
+
+ /**
+ * Construct a {@link FlowTimerQueue} with the specified initial capacity.
+ *
+ * @param initialCapacity The initial capacity of the queue.
+ */
+ public FlowTimerQueue(int initialCapacity) {
+ this.queue = new FlowStage[initialCapacity];
+ }
+
+ /**
+ * Enqueue a timer for the specified context or update the existing timer.
+ */
+ void enqueue(FlowStage ctx) {
+ FlowStage[] es = queue;
+ int k = ctx.timerIndex;
+
+ if (ctx.deadline != Long.MAX_VALUE) {
+ if (k >= 0) {
+ update(es, ctx, k);
+ } else {
+ add(es, ctx);
+ }
+ } else if (k >= 0) {
+ delete(es, k);
+ }
+ }
+
+ /**
+ * Retrieve the head of the queue if its deadline does not exceed <code>now</code>.
+ *
+ * @param now The timestamp that the deadline of the head of the queue should not exceed.
+ * @return The head of the queue if its deadline does not exceed <code>now</code>, otherwise <code>null</code>.
+ */
+ FlowStage poll(long now) {
+ int size = this.size;
+ if (size == 0) {
+ return null;
+ }
+
+ final FlowStage[] es = queue;
+ final FlowStage head = es[0];
+
+ if (now < head.deadline) {
+ return null;
+ }
+
+ int n = size - 1;
+ this.size = n;
+ final FlowStage next = es[n];
+ es[n] = null; // Clear the last element of the queue
+
+ if (n > 0) {
+ siftDown(0, next, es, n);
+ }
+
+ head.timerIndex = -1;
+ return head;
+ }
+
+ /**
+ * Find the earliest deadline in the queue.
+ */
+ long peekDeadline() {
+ if (size > 0) {
+ return queue[0].deadline;
+ }
+
+ return Long.MAX_VALUE;
+ }
+
+ /**
+ * Add a new entry to the queue.
+ */
+ private void add(FlowStage[] es, FlowStage ctx) {
+ int i = size;
+
+ if (i >= es.length) {
+ // Re-fetch the resized array
+ es = grow();
+ }
+
+ siftUp(i, ctx, es);
+
+ size = i + 1;
+ }
+
+ /**
+ * Update the deadline of an existing entry in the queue.
+ */
+ private void update(FlowStage[] es, FlowStage ctx, int k) {
+ if (k > 0) {
+ int parent = (k - 1) >>> 1;
+ if (es[parent].deadline > ctx.deadline) {
+ siftUp(k, ctx, es);
+ return;
+ }
+ }
+
+ siftDown(k, ctx, es, size);
+ }
+
+ /**
+ * Deadline an entry from the queue.
+ */
+ private void delete(FlowStage[] es, int k) {
+ int s = --size;
+ if (s == k) {
+ es[k] = null; // Element is last in the queue
+ } else {
+ FlowStage moved = es[s];
+ es[s] = null;
+
+ siftDown(k, moved, es, s);
+
+ if (es[k] == moved) {
+ siftUp(k, moved, es);
+ }
+ }
+ }
+
+ /**
+ * Increases the capacity of the array.
+ */
+ private FlowStage[] grow() {
+ FlowStage[] queue = this.queue;
+ int oldCapacity = queue.length;
+ int newCapacity = oldCapacity + (oldCapacity >> 1);
+
+ queue = Arrays.copyOf(queue, newCapacity);
+ this.queue = queue;
+ return queue;
+ }
+
+ private static void siftUp(int k, FlowStage key, FlowStage[] es) {
+ while (k > 0) {
+ int parent = (k - 1) >>> 1;
+ FlowStage e = es[parent];
+ if (key.deadline >= e.deadline) break;
+ es[k] = e;
+ e.timerIndex = k;
+ k = parent;
+ }
+ es[k] = key;
+ key.timerIndex = k;
+ }
+
+ private static void siftDown(int k, FlowStage key, FlowStage[] es, int n) {
+ int half = n >>> 1; // loop while a non-leaf
+ while (k < half) {
+ int child = (k << 1) + 1; // assume left child is least
+ FlowStage c = es[child];
+ int right = child + 1;
+ if (right < n && c.deadline > es[right].deadline) c = es[child = right];
+
+ if (key.deadline <= c.deadline) break;
+
+ es[k] = c;
+ c.timerIndex = k;
+ k = child;
+ }
+
+ es[k] = key;
+ key.timerIndex = k;
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandler.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandler.java
new file mode 100644
index 00000000..839b01db
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandler.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+/**
+ * Collection of callbacks for the input port (a {@link InPort}) of a {@link FlowStageLogic}.
+ */
+public interface InHandler {
+ /**
+ * Return the actual flow rate over the input port.
+ *
+ * @param port The input port to which the flow was pushed.
+ * @return The actual flow rate over the port.
+ */
+ default float getRate(InPort port) {
+ return Math.min(port.getDemand(), port.getCapacity());
+ }
+
+ /**
+ * This method is invoked when another {@link FlowStageLogic} changes the rate of flow to the specified inlet.
+ *
+ * @param port The input port to which the flow was pushed.
+ * @param demand The rate of flow the output attempted to push to the port.
+ */
+ void onPush(InPort port, float demand);
+
+ /**
+ * This method is invoked when the input port is finished.
+ *
+ * @param port The input port that has finished.
+ * @param cause The cause of the input port being finished or <code>null</code> if the port completed successfully.
+ */
+ void onUpstreamFinish(InPort port, Throwable cause);
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandlers.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandlers.java
new file mode 100644
index 00000000..9d5b4bef
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandlers.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+/**
+ * A collection of common {@link InHandler} implementations.
+ */
+public class InHandlers {
+ /**
+ * Prevent construction of this class.
+ */
+ private InHandlers() {}
+
+ /**
+ * Return an {@link InHandler} that does nothing.
+ */
+ public static InHandler noop() {
+ return NoopInHandler.INSTANCE;
+ }
+
+ /**
+ * No-op implementation of {@link InHandler}.
+ */
+ private static final class NoopInHandler implements InHandler {
+ public static final InHandler INSTANCE = new NoopInHandler();
+
+ @Override
+ public void onPush(InPort port, float demand) {}
+
+ @Override
+ public void onUpstreamFinish(InPort port, Throwable cause) {}
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java
new file mode 100644
index 00000000..fba12aaf
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java
@@ -0,0 +1,214 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+import java.time.Clock;
+import java.util.Objects;
+
+/**
+ * A port that consumes a flow.
+ * <p>
+ * Input ports are represented as in-going edges in the flow graph.
+ */
+public final class InPort implements Inlet {
+ private final int id;
+
+ private float capacity;
+ private float demand;
+
+ private boolean mask;
+
+ OutPort output;
+ private InHandler handler = InHandlers.noop();
+ private final Clock clock;
+ private final String name;
+ private final FlowStage stage;
+
+ InPort(FlowStage stage, String name, int id) {
+ this.name = name;
+ this.id = id;
+ this.stage = stage;
+ this.clock = stage.clock;
+ }
+
+ @Override
+ public FlowGraph getGraph() {
+ return stage.parentGraph;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Return the identifier of the {@link InPort} with respect to its stage.
+ */
+ public int getId() {
+ return id;
+ }
+
+ /**
+ * Return the current capacity of the input port.
+ */
+ public float getCapacity() {
+ return capacity;
+ }
+
+ /**
+ * Return the current demand of flow of the input port.
+ */
+ public float getDemand() {
+ return demand;
+ }
+
+ /**
+ * Return the current rate of flow of the input port.
+ */
+ public float getRate() {
+ return handler.getRate(this);
+ }
+
+ /**
+ * Pull the flow with the specified <code>capacity</code> from the input port.
+ *
+ * @param capacity The maximum throughput that the stage can receive from the input port.
+ */
+ public void pull(float capacity) {
+ this.capacity = capacity;
+
+ OutPort output = this.output;
+ if (output != null) {
+ output.pull(capacity);
+ }
+ }
+
+ /**
+ * Return the current {@link InHandler} of the input port.
+ */
+ public InHandler getHandler() {
+ return handler;
+ }
+
+ /**
+ * Set the {@link InHandler} of the input port.
+ */
+ public void setHandler(InHandler handler) {
+ this.handler = handler;
+ }
+
+ /**
+ * Return the mask of this port.
+ * <p>
+ * Stages ignore events originating from masked ports.
+ */
+ public boolean getMask() {
+ return mask;
+ }
+
+ /**
+ * (Un)mask the port.
+ */
+ public void setMask(boolean mask) {
+ this.mask = mask;
+ }
+
+ /**
+ * Disconnect the input port from its (potentially) connected outlet.
+ * <p>
+ * The inlet can still be used and re-connected to another outlet.
+ *
+ * @param cause The cause for disconnecting the port or <code>null</code> when no more flow is needed.
+ */
+ public void cancel(Throwable cause) {
+ demand = 0.f;
+
+ OutPort output = this.output;
+ if (output != null) {
+ this.output = null;
+ output.input = null;
+ output.cancel(cause);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ InPort port = (InPort) o;
+ return stage.equals(port.stage) && name.equals(port.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(stage.parentGraph, name);
+ }
+
+ /**
+ * This method is invoked when the inlet is connected to an outlet.
+ */
+ void connect() {
+ OutPort output = this.output;
+ output.pull(capacity);
+ }
+
+ /**
+ * Push a flow from an outlet to this inlet.
+ *
+ * @param demand The rate of flow to push.
+ */
+ void push(float demand) {
+ // No-op when the rate is unchanged
+ if (this.demand == demand) {
+ return;
+ }
+
+ try {
+ handler.onPush(this, demand);
+ this.demand = demand;
+
+ if (!mask) {
+ stage.invalidate(clock.millis());
+ }
+ } catch (Exception e) {
+ stage.doFail(e);
+ }
+ }
+
+ /**
+ * This method is invoked by the connected {@link OutPort} when it finishes.
+ */
+ void finish(Throwable cause) {
+ try {
+ long now = clock.millis();
+ handler.onUpstreamFinish(this, cause);
+ this.demand = 0.f;
+
+ if (!mask) {
+ stage.invalidate(now);
+ }
+ } catch (Exception e) {
+ stage.doFail(e);
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Inlet.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Inlet.java
new file mode 100644
index 00000000..4a9ea6a5
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Inlet.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+/**
+ * An in-going edge in a {@link FlowGraph}.
+ */
+public interface Inlet {
+ /**
+ * Return the {@link FlowGraph} to which the inlet is exposed.
+ */
+ FlowGraph getGraph();
+
+ /**
+ * Return the name of the inlet.
+ */
+ String getName();
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InvocationStack.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InvocationStack.java
new file mode 100644
index 00000000..a5b5114b
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InvocationStack.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+import java.util.Arrays;
+
+/**
+ * A specialized monotonic stack implementation for tracking the scheduled engine invocations.
+ * <p>
+ * By using a specialized class, we reduce the overhead caused by type-erasure.
+ */
+final class InvocationStack {
+ /**
+ * The array of elements in the stack.
+ */
+ private long[] elements;
+
+ private int head = -1;
+
+ public InvocationStack(int initialCapacity) {
+ elements = new long[initialCapacity];
+ Arrays.fill(elements, Long.MIN_VALUE);
+ }
+
+ /**
+ * Try to add the specified invocation to the monotonic stack.
+ *
+ * @param invocation The timestamp of the invocation.
+ * @return <code>true</code> if the invocation was added, <code>false</code> otherwise.
+ */
+ boolean tryAdd(long invocation) {
+ final long[] es = elements;
+ int head = this.head;
+
+ if (head < 0 || es[head] > invocation) {
+ es[head + 1] = invocation;
+ this.head = head + 1;
+
+ if (head + 2 == es.length) {
+ doubleCapacity();
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Remove the head invocation from the stack or return {@link Long#MAX_VALUE} if the stack is empty.
+ */
+ long poll() {
+ final long[] es = elements;
+ int head = this.head--;
+
+ if (head >= 0) {
+ return es[head];
+ }
+
+ return Long.MAX_VALUE;
+ }
+
+ /**
+ * Doubles the capacity of this deque
+ */
+ private void doubleCapacity() {
+ int oldCapacity = elements.length;
+ int newCapacity = oldCapacity + (oldCapacity >> 1);
+ if (newCapacity < 0) {
+ throw new IllegalStateException("Sorry, deque too big");
+ }
+
+ elements = Arrays.copyOf(elements, newCapacity);
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandler.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandler.java
new file mode 100644
index 00000000..723c6d6b
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+/**
+ * Collection of callbacks for the output port (a {@link OutPort}) of a {@link FlowStageLogic}.
+ */
+public interface OutHandler {
+ /**
+ * This method is invoked when another {@link FlowStageLogic} changes the capacity of the outlet.
+ *
+ * @param port The output port of which the capacity was changed.
+ * @param capacity The new capacity of the outlet.
+ */
+ void onPull(OutPort port, float capacity);
+
+ /**
+ * This method is invoked when the output port no longer accepts any flow.
+ * <p>
+ * After this callback no other callbacks will be called for this port.
+ *
+ * @param port The outlet that no longer accepts any flow.
+ * @param cause The cause of the output port no longer accepting any flow or <code>null</code> if the port closed
+ * successfully.
+ */
+ void onDownstreamFinish(OutPort port, Throwable cause);
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandlers.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandlers.java
new file mode 100644
index 00000000..8fbfda0d
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandlers.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+/**
+ * A collection of common {@link OutHandler} implementations.
+ */
+public class OutHandlers {
+ /**
+ * Prevent construction of this class.
+ */
+ private OutHandlers() {}
+
+ /**
+ * Return an {@link OutHandler} that does nothing.
+ */
+ public static OutHandler noop() {
+ return NoopOutHandler.INSTANCE;
+ }
+
+ /**
+ * No-op implementation of {@link OutHandler}.
+ */
+ private static final class NoopOutHandler implements OutHandler {
+ public static final OutHandler INSTANCE = new NoopOutHandler();
+
+ @Override
+ public void onPull(OutPort port, float capacity) {}
+
+ @Override
+ public void onDownstreamFinish(OutPort port, Throwable cause) {}
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java
new file mode 100644
index 00000000..332296a0
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java
@@ -0,0 +1,224 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+import java.time.Clock;
+import java.util.Objects;
+
+/**
+ * A port that outputs a flow.
+ * <p>
+ * Output ports are represented as out-going edges in the flow graph.
+ */
+public final class OutPort implements Outlet {
+ private final int id;
+
+ private float capacity;
+ private float demand;
+
+ private boolean mask;
+
+ InPort input;
+ private OutHandler handler = OutHandlers.noop();
+ private final String name;
+ private final FlowStage stage;
+ private final Clock clock;
+
+ OutPort(FlowStage stage, String name, int id) {
+ this.name = name;
+ this.id = id;
+ this.stage = stage;
+ this.clock = stage.clock;
+ }
+
+ @Override
+ public FlowGraph getGraph() {
+ return stage.parentGraph;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Return the identifier of the {@link OutPort} with respect to its stage.
+ */
+ public int getId() {
+ return id;
+ }
+
+ /**
+ * Return the capacity of the output port.
+ */
+ public float getCapacity() {
+ return capacity;
+ }
+
+ /**
+ * Return the current demand of flow of the output port.
+ */
+ public float getDemand() {
+ return demand;
+ }
+
+ /**
+ * Return the current rate of flow of the input port.
+ */
+ public float getRate() {
+ InPort input = this.input;
+ if (input != null) {
+ return input.getRate();
+ }
+
+ return 0.f;
+ }
+
+ /**
+ * Return the current {@link OutHandler} of the output port.
+ */
+ public OutHandler getHandler() {
+ return handler;
+ }
+
+ /**
+ * Set the {@link OutHandler} of the output port.
+ */
+ public void setHandler(OutHandler handler) {
+ this.handler = handler;
+ }
+
+ /**
+ * Return the mask of this port.
+ * <p>
+ * Stages ignore events originating from masked ports.
+ */
+ public boolean getMask() {
+ return mask;
+ }
+
+ /**
+ * (Un)mask the port.
+ */
+ public void setMask(boolean mask) {
+ this.mask = mask;
+ }
+
+ /**
+ * Push the given flow rate over output port.
+ *
+ * @param rate The rate of the flow to push.
+ */
+ public void push(float rate) {
+ demand = rate;
+ InPort input = this.input;
+
+ if (input != null) {
+ input.push(rate);
+ }
+ }
+
+ /**
+ * Signal to the downstream port that the output has completed successfully and disconnect the port from its input.
+ * <p>
+ * The output port can still be used and re-connected to another input.
+ */
+ public void complete() {
+ fail(null);
+ }
+
+ /**
+ * Signal a failure to the downstream port and disconnect the port from its input.
+ * <p>
+ * The output can still be used and re-connected to another input.
+ */
+ public void fail(Throwable cause) {
+ capacity = 0.f;
+
+ InPort input = this.input;
+ if (input != null) {
+ this.input = null;
+ input.output = null;
+ input.finish(cause);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ OutPort port = (OutPort) o;
+ return stage.equals(port.stage) && name.equals(port.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(stage.parentGraph, name);
+ }
+
+ /**
+ * This method is invoked when the outlet is connected to an inlet.
+ */
+ void connect() {
+ input.push(demand);
+ }
+
+ /**
+ * Pull from this outlet with a specified capacity.
+ *
+ * @param capacity The capacity of the inlet.
+ */
+ void pull(float capacity) {
+ // No-op when outlet is not active or the rate is unchanged
+ if (this.capacity == capacity) {
+ return;
+ }
+
+ try {
+ handler.onPull(this, capacity);
+ this.capacity = capacity;
+
+ if (!mask) {
+ stage.invalidate(clock.millis());
+ }
+ } catch (Exception e) {
+ stage.doFail(e);
+ }
+ }
+
+ /**
+ * This method is invoked by the connected {@link InPort} when downstream cancels the connection.
+ */
+ void cancel(Throwable cause) {
+ try {
+ handler.onDownstreamFinish(this, cause);
+ this.capacity = 0.f;
+
+ if (!mask) {
+ stage.invalidate(clock.millis());
+ }
+ } catch (Exception e) {
+ stage.doFail(e);
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Outlet.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Outlet.java
new file mode 100644
index 00000000..32e19a3b
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Outlet.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+/**
+ * An out-going edge in a {@link FlowGraph}.
+ */
+public interface Outlet {
+ /**
+ * Return the {@link FlowGraph} to which the outlet is exposed.
+ */
+ FlowGraph getGraph();
+
+ /**
+ * Return the name of the outlet.
+ */
+ String getName();
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java
new file mode 100644
index 00000000..1a99d0cf
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.mux;
+
+import org.opendc.simulator.flow2.FlowStageLogic;
+import org.opendc.simulator.flow2.Inlet;
+import org.opendc.simulator.flow2.Outlet;
+
+/**
+ * A {@link FlowStageLogic} that multiplexes multiple inputs over (possibly) multiple outputs.
+ */
+public interface FlowMultiplexer {
+ /**
+ * Return the number of active inputs on this multiplexer.
+ */
+ int getInputCount();
+
+ /**
+ * Allocate a new input on this multiplexer with the specified capacity..
+ *
+ * @return The identifier of the input for this stage.
+ */
+ Inlet newInput();
+
+ /**
+ * Release the input at the specified slot.
+ *
+ * @param inlet The inlet to release.
+ */
+ void releaseInput(Inlet inlet);
+
+ /**
+ * Return the number of active outputs on this multiplexer.
+ */
+ int getOutputCount();
+
+ /**
+ * Allocate a new output on this multiplexer.
+ *
+ * @return The outlet for this stage.
+ */
+ Outlet newOutput();
+
+ /**
+ * Release the output at the specified slot.
+ *
+ * @param outlet The outlet to release.
+ */
+ void releaseOutput(Outlet outlet);
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java
new file mode 100644
index 00000000..ca0639f5
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java
@@ -0,0 +1,268 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.mux;
+
+import org.opendc.simulator.flow2.FlowGraph;
+import org.opendc.simulator.flow2.FlowStage;
+import org.opendc.simulator.flow2.FlowStageLogic;
+import org.opendc.simulator.flow2.InHandler;
+import org.opendc.simulator.flow2.InPort;
+import org.opendc.simulator.flow2.Inlet;
+import org.opendc.simulator.flow2.OutHandler;
+import org.opendc.simulator.flow2.OutPort;
+import org.opendc.simulator.flow2.Outlet;
+
+import java.util.Arrays;
+import java.util.BitSet;
+
+/**
+ * A {@link FlowMultiplexer} implementation that distributes the available capacity of the outputs over the inputs
+ * using max-min fair sharing.
+ * <p>
+ * The max-min fair sharing algorithm of this multiplexer ensures that each input receives a fair share of the combined
+ * output capacity, but allows individual inputs to use more capacity if there is still capacity left.
+ */
+public final class MaxMinFlowMultiplexer implements FlowMultiplexer, FlowStageLogic {
+ private final FlowStage stage;
+ private final BitSet activeInputs;
+ private final BitSet activeOutputs;
+
+ private float capacity = 0.f;
+ private float demand = 0.f;
+ private float rate = 0.f;
+
+ private InPort[] inlets;
+ private long[] inputs;
+ private float[] rates;
+ private OutPort[] outlets;
+
+ private final MultiplexerInHandler inHandler = new MultiplexerInHandler();
+ private final MultiplexerOutHandler outHandler = new MultiplexerOutHandler();
+
+ /**
+ * Construct a {@link MaxMinFlowMultiplexer} instance.
+ *
+ * @param graph The {@link FlowGraph} to add the multiplexer to.
+ */
+ public MaxMinFlowMultiplexer(FlowGraph graph) {
+ this.stage = graph.newStage(this);
+ this.activeInputs = new BitSet();
+ this.activeOutputs = new BitSet();
+
+ this.inlets = new InPort[4];
+ this.inputs = new long[4];
+ this.rates = new float[4];
+ this.outlets = new OutPort[4];
+ }
+
+ @Override
+ public long onUpdate(FlowStage ctx, long now) {
+ float capacity = this.capacity;
+ float demand = this.demand;
+ float rate = demand;
+
+ if (demand > capacity) {
+ rate = redistributeCapacity(inlets, inputs, rates, capacity);
+ }
+
+ if (this.rate != rate) {
+ // Only update the outputs if the output rate has changed
+ this.rate = rate;
+
+ changeRate(activeOutputs, outlets, capacity, rate);
+ }
+
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public int getInputCount() {
+ return activeInputs.length();
+ }
+
+ @Override
+ public Inlet newInput() {
+ final BitSet activeInputs = this.activeInputs;
+ int slot = activeInputs.nextClearBit(0);
+
+ InPort port = stage.getInlet("in" + slot);
+ port.setHandler(inHandler);
+ port.pull(this.capacity);
+
+ InPort[] inlets = this.inlets;
+ if (slot >= inlets.length) {
+ int newLength = inlets.length + (inlets.length >> 1);
+ inlets = Arrays.copyOf(inlets, newLength);
+ inputs = Arrays.copyOf(inputs, newLength);
+ rates = Arrays.copyOf(rates, newLength);
+ this.inlets = inlets;
+ }
+ inlets[slot] = port;
+
+ activeInputs.set(slot);
+ return port;
+ }
+
+ @Override
+ public void releaseInput(Inlet inlet) {
+ InPort port = (InPort) inlet;
+
+ activeInputs.clear(port.getId());
+ port.cancel(null);
+ }
+
+ @Override
+ public int getOutputCount() {
+ return activeOutputs.length();
+ }
+
+ @Override
+ public Outlet newOutput() {
+ final BitSet activeOutputs = this.activeOutputs;
+ int slot = activeOutputs.nextClearBit(0);
+
+ OutPort port = stage.getOutlet("out" + slot);
+ port.setHandler(outHandler);
+
+ OutPort[] outlets = this.outlets;
+ if (slot >= outlets.length) {
+ int newLength = outlets.length + (outlets.length >> 1);
+ outlets = Arrays.copyOf(outlets, newLength);
+ this.outlets = outlets;
+ }
+ outlets[slot] = port;
+
+ activeOutputs.set(slot);
+ return port;
+ }
+
+ @Override
+ public void releaseOutput(Outlet outlet) {
+ OutPort port = (OutPort) outlet;
+ activeInputs.clear(port.getId());
+ port.complete();
+ }
+
+ /**
+ * Helper function to redistribute the specified capacity across the inlets.
+ */
+ private static float redistributeCapacity(InPort[] inlets, long[] inputs, float[] rates, float capacity) {
+ // If the demand is higher than the capacity, we need use max-min fair sharing to distribute the
+ // constrained capacity across the inputs.
+ for (int i = 0; i < inputs.length; i++) {
+ InPort inlet = inlets[i];
+ if (inlet == null) {
+ break;
+ }
+
+ inputs[i] = ((long) Float.floatToRawIntBits(inlet.getDemand()) << 32) | (i & 0xFFFFFFFFL);
+ }
+ Arrays.sort(inputs);
+
+ float availableCapacity = capacity;
+ int inputSize = inputs.length;
+
+ // Divide the available output capacity fairly over the inputs using max-min fair sharing
+ for (int i = 0; i < inputs.length; i++) {
+ long v = inputs[i];
+ int slot = (int) v;
+ float d = Float.intBitsToFloat((int) (v >> 32));
+
+ if (d == 0.0) {
+ continue;
+ }
+
+ float availableShare = availableCapacity / (inputSize - i);
+ float r = Math.min(d, availableShare);
+
+ rates[slot] = r;
+ availableCapacity -= r;
+ }
+
+ return capacity - availableCapacity;
+ }
+
+ /**
+ * Helper method to change the rate of the outlets.
+ */
+ private static void changeRate(BitSet activeOutputs, OutPort[] outlets, float capacity, float rate) {
+ // Divide the requests over the available capacity of the input resources fairly
+ for (int i = activeOutputs.nextSetBit(0); i != -1; i = activeOutputs.nextSetBit(i + 1)) {
+ OutPort outlet = outlets[i];
+ float fraction = outlet.getCapacity() / capacity;
+ outlet.push(rate * fraction);
+ }
+ }
+
+ /**
+ * A {@link InHandler} implementation for the multiplexer inputs.
+ */
+ private class MultiplexerInHandler implements InHandler {
+ @Override
+ public float getRate(InPort port) {
+ return rates[port.getId()];
+ }
+
+ @Override
+ public void onPush(InPort port, float demand) {
+ MaxMinFlowMultiplexer.this.demand += -port.getDemand() + demand;
+ rates[port.getId()] = demand;
+ }
+
+ @Override
+ public void onUpstreamFinish(InPort port, Throwable cause) {
+ MaxMinFlowMultiplexer.this.demand -= port.getDemand();
+ releaseInput(port);
+ rates[port.getId()] = 0.f;
+ }
+ }
+
+ /**
+ * A {@link OutHandler} implementation for the multiplexer outputs.
+ */
+ private class MultiplexerOutHandler implements OutHandler {
+ @Override
+ public void onPull(OutPort port, float capacity) {
+ float newCapacity = MaxMinFlowMultiplexer.this.capacity - port.getCapacity() + capacity;
+ MaxMinFlowMultiplexer.this.capacity = newCapacity;
+ changeInletCapacity(newCapacity);
+ }
+
+ @Override
+ public void onDownstreamFinish(OutPort port, Throwable cause) {
+ float newCapacity = MaxMinFlowMultiplexer.this.capacity - port.getCapacity();
+ MaxMinFlowMultiplexer.this.capacity = newCapacity;
+ releaseOutput(port);
+ changeInletCapacity(newCapacity);
+ }
+
+ private void changeInletCapacity(float capacity) {
+ BitSet activeInputs = MaxMinFlowMultiplexer.this.activeInputs;
+ InPort[] inlets = MaxMinFlowMultiplexer.this.inlets;
+
+ for (int i = activeInputs.nextSetBit(0); i != -1; i = activeInputs.nextSetBit(i + 1)) {
+ inlets[i].pull(capacity);
+ }
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/FlowSink.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/FlowSink.java
new file mode 100644
index 00000000..69c94708
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/FlowSink.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.sink;
+
+import org.opendc.simulator.flow2.FlowStage;
+import org.opendc.simulator.flow2.Inlet;
+
+/**
+ * A {@link FlowStage} with a single input.
+ */
+public interface FlowSink {
+ /**
+ * Return the input of this {@link FlowSink}.
+ */
+ Inlet getInput();
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/SimpleFlowSink.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/SimpleFlowSink.java
new file mode 100644
index 00000000..fdfe5ee8
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/SimpleFlowSink.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.sink;
+
+import org.opendc.simulator.flow2.FlowGraph;
+import org.opendc.simulator.flow2.FlowStage;
+import org.opendc.simulator.flow2.FlowStageLogic;
+import org.opendc.simulator.flow2.InHandler;
+import org.opendc.simulator.flow2.InPort;
+import org.opendc.simulator.flow2.Inlet;
+
+/**
+ * A sink with a fixed capacity.
+ */
+public final class SimpleFlowSink implements FlowSink, FlowStageLogic {
+ private final FlowStage stage;
+ private final InPort input;
+ private final Handler handler;
+
+ /**
+ * Construct a new {@link SimpleFlowSink} with the specified initial capacity.
+ *
+ * @param graph The graph to add the sink to.
+ * @param initialCapacity The initial capacity of the sink.
+ */
+ public SimpleFlowSink(FlowGraph graph, float initialCapacity) {
+ this.stage = graph.newStage(this);
+ this.handler = new Handler();
+ this.input = stage.getInlet("in");
+ this.input.pull(initialCapacity);
+ this.input.setMask(true);
+ this.input.setHandler(handler);
+ }
+
+ /**
+ * Return the {@link Inlet} of this sink.
+ */
+ @Override
+ public Inlet getInput() {
+ return input;
+ }
+
+ /**
+ * Return the capacity of the sink.
+ */
+ public float getCapacity() {
+ return input.getCapacity();
+ }
+
+ /**
+ * Update the capacity of the sink.
+ *
+ * @param capacity The new capacity to update the sink to.
+ */
+ public void setCapacity(float capacity) {
+ input.pull(capacity);
+ stage.invalidate();
+ }
+
+ /**
+ * Return the flow rate of the sink.
+ */
+ public float getRate() {
+ return input.getRate();
+ }
+
+ /**
+ * Remove this node from the graph.
+ */
+ public void close() {
+ stage.close();
+ }
+
+ @Override
+ public long onUpdate(FlowStage ctx, long now) {
+ InPort input = this.input;
+ handler.rate = Math.min(input.getDemand(), input.getCapacity());
+ return Long.MAX_VALUE;
+ }
+
+ /**
+ * The {@link InHandler} implementation for the sink.
+ */
+ private static final class Handler implements InHandler {
+ float rate;
+
+ @Override
+ public float getRate(InPort port) {
+ return rate;
+ }
+
+ @Override
+ public void onPush(InPort port, float demand) {
+ float capacity = port.getCapacity();
+ rate = Math.min(demand, capacity);
+ }
+
+ @Override
+ public void onUpstreamFinish(InPort port, Throwable cause) {
+ rate = 0.f;
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/EmptyFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/EmptyFlowSource.java
new file mode 100644
index 00000000..2dcc66e4
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/EmptyFlowSource.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.source;
+
+import org.opendc.simulator.flow2.FlowGraph;
+import org.opendc.simulator.flow2.FlowStage;
+import org.opendc.simulator.flow2.FlowStageLogic;
+import org.opendc.simulator.flow2.OutPort;
+import org.opendc.simulator.flow2.Outlet;
+
+/**
+ * An empty {@link FlowSource}.
+ */
+public final class EmptyFlowSource implements FlowSource, FlowStageLogic {
+ private final FlowStage stage;
+ private final OutPort output;
+
+ /**
+ * Construct a new {@link EmptyFlowSource}.
+ */
+ public EmptyFlowSource(FlowGraph graph) {
+ this.stage = graph.newStage(this);
+ this.output = stage.getOutlet("out");
+ }
+
+ /**
+ * Return the {@link Outlet} of the source.
+ */
+ @Override
+ public Outlet getOutput() {
+ return output;
+ }
+
+ /**
+ * Remove this node from the graph.
+ */
+ public void close() {
+ stage.close();
+ }
+
+ @Override
+ public long onUpdate(FlowStage ctx, long now) {
+ return Long.MAX_VALUE;
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/FlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/FlowSource.java
new file mode 100644
index 00000000..f9432c33
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/FlowSource.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.source;
+
+import org.opendc.simulator.flow2.FlowStage;
+import org.opendc.simulator.flow2.Outlet;
+
+/**
+ * A {@link FlowStage} with a single output.
+ */
+public interface FlowSource {
+ /**
+ * Return the output of this {@link FlowSource}.
+ */
+ Outlet getOutput();
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java
new file mode 100644
index 00000000..a237c81e
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.source;
+
+import org.opendc.simulator.flow2.FlowGraph;
+import org.opendc.simulator.flow2.FlowStage;
+import org.opendc.simulator.flow2.FlowStageLogic;
+import org.opendc.simulator.flow2.OutHandler;
+import org.opendc.simulator.flow2.OutPort;
+import org.opendc.simulator.flow2.Outlet;
+
+import java.util.function.Consumer;
+
+/**
+ * A {@link FlowSource} that ensures a flow is emitted for a specified amount of time at some utilization.
+ */
+public class RuntimeFlowSource implements FlowSource, FlowStageLogic {
+ private final float utilization;
+
+ private final FlowStage stage;
+ private final OutPort output;
+ private final Consumer<RuntimeFlowSource> completionHandler;
+
+ private long duration;
+ private long lastPull;
+
+ /**
+ * Construct a {@link RuntimeFlowSource} instance.
+ *
+ * @param graph The {@link FlowGraph} to which this source belongs.
+ * @param duration The duration of the source.
+ * @param utilization The utilization of the capacity of the outlet.
+ * @param completionHandler A callback invoked when the source completes.
+ */
+ public RuntimeFlowSource(
+ FlowGraph graph, long duration, float utilization, Consumer<RuntimeFlowSource> completionHandler) {
+ if (duration <= 0) {
+ throw new IllegalArgumentException("Duration must be positive and non-zero");
+ }
+
+ if (utilization <= 0.0) {
+ throw new IllegalArgumentException("Utilization must be positive and non-zero");
+ }
+
+ this.stage = graph.newStage(this);
+ this.output = stage.getOutlet("out");
+ this.output.setHandler(new OutHandler() {
+ @Override
+ public void onPull(OutPort port, float capacity) {}
+
+ @Override
+ public void onDownstreamFinish(OutPort port, Throwable cause) {
+ // Source cannot complete without re-connecting to another sink, so mark the source as completed
+ completionHandler.accept(RuntimeFlowSource.this);
+ }
+ });
+ this.duration = duration;
+ this.utilization = utilization;
+ this.completionHandler = completionHandler;
+ this.lastPull = graph.getEngine().getClock().millis();
+ }
+
+ /**
+ * Construct a new {@link RuntimeFlowSource}.
+ *
+ * @param graph The {@link FlowGraph} to which this source belongs.
+ * @param duration The duration of the source.
+ * @param utilization The utilization of the capacity of the outlet.
+ */
+ public RuntimeFlowSource(FlowGraph graph, long duration, float utilization) {
+ this(graph, duration, utilization, RuntimeFlowSource::close);
+ }
+
+ /**
+ * Return the {@link Outlet} of the source.
+ */
+ @Override
+ public Outlet getOutput() {
+ return output;
+ }
+
+ /**
+ * Remove this node from the graph.
+ */
+ public void close() {
+ stage.close();
+ }
+
+ @Override
+ public long onUpdate(FlowStage ctx, long now) {
+ long lastPull = this.lastPull;
+ this.lastPull = now;
+
+ long delta = Math.max(0, now - lastPull);
+
+ OutPort output = this.output;
+ float limit = output.getCapacity() * utilization;
+ long duration = this.duration - delta;
+
+ if (duration <= 0) {
+ completionHandler.accept(this);
+ return Long.MAX_VALUE;
+ }
+
+ this.duration = duration;
+ output.push(limit);
+ return now + duration;
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java
new file mode 100644
index 00000000..764a20a8
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.source;
+
+import org.opendc.simulator.flow2.FlowGraph;
+import org.opendc.simulator.flow2.FlowStage;
+import org.opendc.simulator.flow2.FlowStageLogic;
+import org.opendc.simulator.flow2.OutHandler;
+import org.opendc.simulator.flow2.OutPort;
+import org.opendc.simulator.flow2.Outlet;
+
+import java.util.function.Consumer;
+
+/**
+ * A flow source that contains a fixed amount and is pushed with a given utilization.
+ */
+public final class SimpleFlowSource implements FlowSource, FlowStageLogic {
+ private final float utilization;
+ private float remainingAmount;
+ private long lastPull;
+
+ private final FlowStage stage;
+ private final OutPort output;
+ private final Consumer<SimpleFlowSource> completionHandler;
+
+ /**
+ * Construct a new {@link SimpleFlowSource}.
+ *
+ * @param graph The {@link FlowGraph} to which this source belongs.
+ * @param amount The amount to transfer via the outlet.
+ * @param utilization The utilization of the capacity of the outlet.
+ * @param completionHandler A callback invoked when the source completes.
+ */
+ public SimpleFlowSource(
+ FlowGraph graph, float amount, float utilization, Consumer<SimpleFlowSource> completionHandler) {
+ if (amount < 0.0) {
+ throw new IllegalArgumentException("Amount must be non-negative");
+ }
+
+ if (utilization <= 0.0) {
+ throw new IllegalArgumentException("Utilization must be positive and non-zero");
+ }
+
+ this.stage = graph.newStage(this);
+ this.output = stage.getOutlet("out");
+ this.output.setHandler(new OutHandler() {
+ @Override
+ public void onPull(OutPort port, float capacity) {}
+
+ @Override
+ public void onDownstreamFinish(OutPort port, Throwable cause) {
+ // Source cannot complete without re-connecting to another sink, so mark the source as completed
+ completionHandler.accept(SimpleFlowSource.this);
+ }
+ });
+ this.completionHandler = completionHandler;
+ this.utilization = utilization;
+ this.remainingAmount = amount;
+ this.lastPull = graph.getEngine().getClock().millis();
+ }
+
+ /**
+ * Construct a new {@link SimpleFlowSource}.
+ *
+ * @param graph The {@link FlowGraph} to which this source belongs.
+ * @param amount The amount to transfer via the outlet.
+ * @param utilization The utilization of the capacity of the outlet.
+ */
+ public SimpleFlowSource(FlowGraph graph, float amount, float utilization) {
+ this(graph, amount, utilization, SimpleFlowSource::close);
+ }
+
+ /**
+ * Return the {@link Outlet} of the source.
+ */
+ @Override
+ public Outlet getOutput() {
+ return output;
+ }
+
+ /**
+ * Remove this node from the graph.
+ */
+ public void close() {
+ stage.close();
+ }
+
+ @Override
+ public long onUpdate(FlowStage ctx, long now) {
+ long lastPull = this.lastPull;
+ this.lastPull = now;
+
+ long delta = Math.max(0, now - lastPull);
+
+ OutPort output = this.output;
+ float consumed = output.getRate() * delta / 1000.f;
+ float limit = output.getCapacity() * utilization;
+
+ float remainingAmount = this.remainingAmount - consumed;
+ this.remainingAmount = remainingAmount;
+
+ long duration = (long) Math.ceil(remainingAmount / limit * 1000);
+
+ if (duration <= 0) {
+ completionHandler.accept(this);
+ return Long.MAX_VALUE;
+ }
+
+ output.push(limit);
+ return now + duration;
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java
new file mode 100644
index 00000000..96d43aef
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.source;
+
+import org.opendc.simulator.flow2.FlowGraph;
+import org.opendc.simulator.flow2.FlowStage;
+import org.opendc.simulator.flow2.FlowStageLogic;
+import org.opendc.simulator.flow2.OutHandler;
+import org.opendc.simulator.flow2.OutPort;
+import org.opendc.simulator.flow2.Outlet;
+
+import java.util.function.Consumer;
+
+/**
+ * A flow source that replays a sequence of fragments, each indicating the flow rate for some period of time.
+ */
+public final class TraceFlowSource implements FlowSource, FlowStageLogic {
+ private final OutPort output;
+ private final long[] deadlines;
+ private final float[] usages;
+ private final int size;
+ private int index;
+
+ private final FlowStage stage;
+ private final Consumer<TraceFlowSource> completionHandler;
+
+ /**
+ * Construct a {@link TraceFlowSource}.
+ *
+ * @param graph The {@link FlowGraph} to which the source belongs.
+ * @param trace The {@link Trace} to replay.
+ * @param completionHandler The completion handler to invoke when the source finishes.
+ */
+ public TraceFlowSource(FlowGraph graph, Trace trace, Consumer<TraceFlowSource> completionHandler) {
+ this.stage = graph.newStage(this);
+ this.output = stage.getOutlet("out");
+ this.output.setHandler(new OutHandler() {
+ @Override
+ public void onPull(OutPort port, float capacity) {}
+
+ @Override
+ public void onDownstreamFinish(OutPort port, Throwable cause) {
+ // Source cannot complete without re-connecting to another sink, so mark the source as completed
+ completionHandler.accept(TraceFlowSource.this);
+ }
+ });
+ this.deadlines = trace.deadlines;
+ this.usages = trace.usages;
+ this.size = trace.size;
+ this.completionHandler = completionHandler;
+ }
+
+ /**
+ * Construct a {@link TraceFlowSource}.
+ *
+ * @param graph The {@link FlowGraph} to which the source belongs.
+ * @param trace The {@link Trace} to replay.
+ */
+ public TraceFlowSource(FlowGraph graph, Trace trace) {
+ this(graph, trace, TraceFlowSource::close);
+ }
+
+ @Override
+ public Outlet getOutput() {
+ return output;
+ }
+
+ /**
+ * Remove this node from the graph.
+ */
+ public void close() {
+ stage.close();
+ }
+
+ @Override
+ public long onUpdate(FlowStage ctx, long now) {
+ int size = this.size;
+ int index = this.index;
+ long[] deadlines = this.deadlines;
+ long deadline;
+
+ do {
+ deadline = deadlines[index];
+ } while (deadline <= now && ++index < size);
+
+ if (index >= size) {
+ output.push(0.0f);
+ completionHandler.accept(this);
+ return Long.MAX_VALUE;
+ }
+
+ this.index = index;
+ float usage = usages[index];
+ output.push(usage);
+
+ return deadline;
+ }
+
+ /**
+ * A trace describes the workload over time.
+ */
+ public static final class Trace {
+ private final long[] deadlines;
+ private final float[] usages;
+ private final int size;
+
+ /**
+ * Construct a {@link Trace}.
+ *
+ * @param deadlines The deadlines of the trace fragments.
+ * @param usages The usages of the trace fragments.
+ * @param size The size of the trace.
+ */
+ public Trace(long[] deadlines, float[] usages, int size) {
+ this.deadlines = deadlines;
+ this.usages = usages;
+ this.size = size;
+ }
+
+ public long[] getDeadlines() {
+ return deadlines;
+ }
+
+ public float[] getUsages() {
+ return usages;
+ }
+
+ public int getSize() {
+ return size;
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt
new file mode 100644
index 00000000..839835ce
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt
@@ -0,0 +1,197 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2
+
+import io.mockk.mockk
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertNotEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.flow2.mux.MaxMinFlowMultiplexer
+import org.opendc.simulator.flow2.sink.SimpleFlowSink
+import org.opendc.simulator.flow2.source.SimpleFlowSource
+import org.opendc.simulator.kotlin.runSimulation
+
+/**
+ * Smoke tests for the Flow API.
+ */
+class FlowEngineTest {
+ @Test
+ fun testSmoke() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val multiplexer = MaxMinFlowMultiplexer(graph)
+ val sink = SimpleFlowSink(graph, 2.0f)
+
+ graph.connect(multiplexer.newOutput(), sink.input)
+
+ val sourceA = SimpleFlowSource(graph, 2000.0f, 0.8f)
+ val sourceB = SimpleFlowSource(graph, 2000.0f, 0.8f)
+
+ graph.connect(sourceA.output, multiplexer.newInput())
+ graph.connect(sourceB.output, multiplexer.newInput())
+ }
+
+ @Test
+ fun testConnectInvalidInlet() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val inlet = mockk<Inlet>()
+ val source = SimpleFlowSource(graph, 2000.0f, 0.8f)
+ assertThrows<IllegalArgumentException> { graph.connect(source.output, inlet) }
+ }
+
+ @Test
+ fun testConnectInvalidOutlet() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val outlet = mockk<Outlet>()
+ val sink = SimpleFlowSink(graph, 2.0f)
+ assertThrows<IllegalArgumentException> { graph.connect(outlet, sink.input) }
+ }
+
+ @Test
+ fun testConnectInletBelongsToDifferentGraph() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graphA = engine.newGraph()
+ val graphB = engine.newGraph()
+
+ val sink = SimpleFlowSink(graphB, 2.0f)
+ val source = SimpleFlowSource(graphA, 2000.0f, 0.8f)
+
+ assertThrows<IllegalArgumentException> { graphA.connect(source.output, sink.input) }
+ }
+
+ @Test
+ fun testConnectOutletBelongsToDifferentGraph() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graphA = engine.newGraph()
+ val graphB = engine.newGraph()
+
+ val sink = SimpleFlowSink(graphA, 2.0f)
+ val source = SimpleFlowSource(graphB, 2000.0f, 0.8f)
+
+ assertThrows<IllegalArgumentException> { graphA.connect(source.output, sink.input) }
+ }
+
+ @Test
+ fun testConnectInletAlreadyConnected() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val sink = SimpleFlowSink(graph, 2.0f)
+ val sourceA = SimpleFlowSource(graph, 2000.0f, 0.8f)
+ val sourceB = SimpleFlowSource(graph, 2000.0f, 0.8f)
+
+ graph.connect(sourceA.output, sink.input)
+ assertThrows<IllegalStateException> { graph.connect(sourceB.output, sink.input) }
+ }
+
+ @Test
+ fun testConnectOutletAlreadyConnected() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val sinkA = SimpleFlowSink(graph, 2.0f)
+ val sinkB = SimpleFlowSink(graph, 2.0f)
+ val source = SimpleFlowSource(graph, 2000.0f, 0.8f)
+
+ graph.connect(source.output, sinkA.input)
+ assertThrows<IllegalStateException> { graph.connect(source.output, sinkB.input) }
+ }
+
+ @Test
+ fun testDisconnectInletInvalid() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val inlet = mockk<Inlet>()
+ assertThrows<IllegalArgumentException> { graph.disconnect(inlet) }
+ }
+
+ @Test
+ fun testDisconnectOutletInvalid() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val outlet = mockk<Outlet>()
+ assertThrows<IllegalArgumentException> { graph.disconnect(outlet) }
+ }
+
+ @Test
+ fun testDisconnectInletInvalidGraph() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graphA = engine.newGraph()
+ val graphB = engine.newGraph()
+
+ val sink = SimpleFlowSink(graphA, 2.0f)
+
+ assertThrows<IllegalArgumentException> { graphB.disconnect(sink.input) }
+ }
+
+ @Test
+ fun testDisconnectOutletInvalidGraph() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graphA = engine.newGraph()
+ val graphB = engine.newGraph()
+
+ val source = SimpleFlowSource(graphA, 2000.0f, 0.8f)
+
+ assertThrows<IllegalArgumentException> { graphB.disconnect(source.output) }
+ }
+
+ @Test
+ fun testInletEquality() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val sinkA = SimpleFlowSink(graph, 2.0f)
+ val sinkB = SimpleFlowSink(graph, 2.0f)
+
+ val multiplexer = MaxMinFlowMultiplexer(graph)
+
+ assertEquals(sinkA.input, sinkA.input)
+ assertNotEquals(sinkA.input, sinkB.input)
+
+ assertNotEquals(multiplexer.newInput(), multiplexer.newInput())
+ }
+
+ @Test
+ fun testOutletEquality() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val sourceA = SimpleFlowSource(graph, 2000.0f, 0.8f)
+ val sourceB = SimpleFlowSource(graph, 2000.0f, 0.8f)
+
+ val multiplexer = MaxMinFlowMultiplexer(graph)
+
+ assertEquals(sourceA.output, sourceA.output)
+ assertNotEquals(sourceA.output, sourceB.output)
+
+ assertNotEquals(multiplexer.newOutput(), multiplexer.newOutput())
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowTimerQueueTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowTimerQueueTest.kt
new file mode 100644
index 00000000..1824959c
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowTimerQueueTest.kt
@@ -0,0 +1,385 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2
+
+import io.mockk.mockk
+import org.junit.jupiter.api.Assertions.assertAll
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertNull
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+
+/**
+ * Test suite for the [FlowTimerQueue] class.
+ */
+class FlowTimerQueueTest {
+ private lateinit var queue: FlowTimerQueue
+
+ @BeforeEach
+ fun setUp() {
+ queue = FlowTimerQueue(3)
+ }
+
+ /**
+ * Test whether a call to [FlowTimerQueue.poll] returns `null` for an empty queue.
+ */
+ @Test
+ fun testPollEmpty() {
+ assertAll(
+ { assertEquals(Long.MAX_VALUE, queue.peekDeadline()) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test whether a call to [FlowTimerQueue.poll] returns the proper value for a queue with a single entry.
+ */
+ @Test
+ fun testSingleEntry() {
+ val entry = mockk<FlowStage>()
+ entry.deadline = 100
+ entry.timerIndex = -1
+
+ queue.enqueue(entry)
+
+ assertAll(
+ { assertEquals(100, queue.peekDeadline()) },
+ { assertNull(queue.poll(10L)) },
+ { assertEquals(entry, queue.poll(200L)) },
+ { assertNull(queue.poll(200L)) }
+ )
+ }
+
+ /**
+ * Test whether [FlowTimerQueue.poll] returns values in the queue in the proper order.
+ */
+ @Test
+ fun testMultipleEntries() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 10
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ assertAll(
+ { assertEquals(10, queue.peekDeadline()) },
+ { assertEquals(entryB, queue.poll(100L)) },
+ { assertEquals(entryC, queue.poll(100L)) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test that the queue is properly resized when the number of entries exceed the capacity.
+ */
+ @Test
+ fun testResize() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 20
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ val entryD = mockk<FlowStage>()
+ entryD.deadline = 31
+ entryD.timerIndex = -1
+
+ queue.enqueue(entryD)
+
+ assertAll(
+ { assertEquals(20, queue.peekDeadline()) },
+ { assertEquals(entryB, queue.poll(100L)) },
+ { assertEquals(entryD, queue.poll(100L)) },
+ { assertEquals(entryC, queue.poll(100L)) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test to verify that we can change the deadline of the last element in the queue.
+ */
+ @Test
+ fun testChangeDeadlineTail() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 20
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ entryA.deadline = 10
+ queue.enqueue(entryA)
+
+ assertAll(
+ { assertEquals(10, queue.peekDeadline()) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertEquals(entryB, queue.poll(100L)) },
+ { assertEquals(entryC, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test that we can change the deadline of the head entry in the queue.
+ */
+ @Test
+ fun testChangeDeadlineMiddle() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 20
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ entryC.deadline = 10
+ queue.enqueue(entryC)
+
+ assertAll(
+ { assertEquals(10, queue.peekDeadline()) },
+ { assertEquals(entryC, queue.poll(100L)) },
+ { assertEquals(entryB, queue.poll(100L)) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test that we can change the deadline of the head entry in the queue.
+ */
+ @Test
+ fun testChangeDeadlineHead() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 20
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ entryB.deadline = 30
+ queue.enqueue(entryB)
+
+ assertAll(
+ { assertEquals(30, queue.peekDeadline()) },
+ { assertEquals(entryB, queue.poll(100L)) },
+ { assertEquals(entryC, queue.poll(100L)) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test that an unchanged deadline results in a no-op.
+ */
+ @Test
+ fun testChangeDeadlineNop() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 20
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ // Should be a no-op
+ queue.enqueue(entryA)
+
+ assertAll(
+ { assertEquals(20, queue.peekDeadline()) },
+ { assertEquals(entryB, queue.poll(100L)) },
+ { assertEquals(entryC, queue.poll(100L)) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test that we can remove an entry from the end of the queue.
+ */
+ @Test
+ fun testRemoveEntryTail() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 20
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ entryC.deadline = Long.MAX_VALUE
+ queue.enqueue(entryC)
+
+ assertAll(
+ { assertEquals(20, queue.peekDeadline()) },
+ { assertEquals(entryB, queue.poll(100L)) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test that we can remove an entry from the head of the queue.
+ */
+ @Test
+ fun testRemoveEntryHead() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 20
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ entryB.deadline = Long.MAX_VALUE
+ queue.enqueue(entryB)
+
+ assertAll(
+ { assertEquals(58, queue.peekDeadline()) },
+ { assertEquals(entryC, queue.poll(100L)) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+
+ /**
+ * Test that we can remove an entry from the middle of a queue.
+ */
+ @Test
+ fun testRemoveEntryMiddle() {
+ val entryA = mockk<FlowStage>()
+ entryA.deadline = 100
+ entryA.timerIndex = -1
+
+ queue.enqueue(entryA)
+
+ val entryB = mockk<FlowStage>()
+ entryB.deadline = 20
+ entryB.timerIndex = -1
+
+ queue.enqueue(entryB)
+
+ val entryC = mockk<FlowStage>()
+ entryC.deadline = 58
+ entryC.timerIndex = -1
+
+ queue.enqueue(entryC)
+
+ entryC.deadline = Long.MAX_VALUE
+ queue.enqueue(entryC)
+
+ assertAll(
+ { assertEquals(20, queue.peekDeadline()) },
+ { assertEquals(entryB, queue.poll(100L)) },
+ { assertEquals(entryA, queue.poll(100L)) },
+ { assertNull(queue.poll(100L)) }
+ )
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/InvocationStackTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/InvocationStackTest.kt
new file mode 100644
index 00000000..2250fe87
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/InvocationStackTest.kt
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2
+
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertFalse
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.api.Test
+
+/**
+ * Test suite for the [InvocationStack] class.
+ */
+class InvocationStackTest {
+ private val stack = InvocationStack(2)
+
+ @Test
+ fun testPollEmpty() {
+ assertEquals(Long.MAX_VALUE, stack.poll())
+ }
+
+ @Test
+ fun testAddSingle() {
+ assertTrue(stack.tryAdd(10))
+ assertEquals(10, stack.poll())
+ }
+
+ @Test
+ fun testAddLater() {
+ assertTrue(stack.tryAdd(10))
+ assertFalse(stack.tryAdd(15))
+ assertEquals(10, stack.poll())
+ }
+
+ @Test
+ fun testAddEarlier() {
+ assertTrue(stack.tryAdd(10))
+ assertTrue(stack.tryAdd(5))
+ assertEquals(5, stack.poll())
+ assertEquals(10, stack.poll())
+ }
+
+ @Test
+ fun testCapacityExceeded() {
+ assertTrue(stack.tryAdd(10))
+ assertTrue(stack.tryAdd(5))
+ assertTrue(stack.tryAdd(2))
+ assertEquals(2, stack.poll())
+ assertEquals(5, stack.poll())
+ assertEquals(10, stack.poll())
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt
new file mode 100644
index 00000000..ba339ee3
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.mux
+
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.opendc.simulator.flow2.FlowEngine
+import org.opendc.simulator.flow2.sink.SimpleFlowSink
+import org.opendc.simulator.flow2.source.SimpleFlowSource
+import org.opendc.simulator.kotlin.runSimulation
+
+/**
+ * Test suite for the [MaxMinFlowMultiplexer] class.
+ */
+class MaxMinFlowMultiplexerTest {
+ @Test
+ fun testSmoke() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+ val switch = MaxMinFlowMultiplexer(graph)
+
+ val sinks = List(2) { SimpleFlowSink(graph, 2000.0f) }
+ for (source in sinks) {
+ graph.connect(switch.newOutput(), source.input)
+ }
+
+ val source = SimpleFlowSource(graph, 2000.0f, 1.0f)
+ graph.connect(source.output, switch.newInput())
+
+ advanceUntilIdle()
+
+ assertEquals(500, clock.millis())
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt
new file mode 100644
index 00000000..a75efba3
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt
@@ -0,0 +1,124 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.sink
+
+import kotlinx.coroutines.delay
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.opendc.simulator.flow2.FlowEngine
+import org.opendc.simulator.flow2.source.SimpleFlowSource
+import org.opendc.simulator.flow2.source.TraceFlowSource
+import org.opendc.simulator.kotlin.runSimulation
+import java.util.concurrent.ThreadLocalRandom
+
+/**
+ * Test suite for the [SimpleFlowSink] class.
+ */
+class FlowSinkTest {
+ @Test
+ fun testSmoke() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val sink = SimpleFlowSink(graph, 1.0f)
+ val source = SimpleFlowSource(graph, 2.0f, 1.0f)
+
+ graph.connect(source.output, sink.input)
+ advanceUntilIdle()
+
+ assertEquals(2000, clock.millis())
+ }
+
+ @Test
+ fun testAdjustCapacity() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val sink = SimpleFlowSink(graph, 1.0f)
+ val source = SimpleFlowSource(graph, 2.0f, 1.0f)
+
+ graph.connect(source.output, sink.input)
+
+ delay(1000)
+ sink.capacity = 0.5f
+
+ advanceUntilIdle()
+
+ assertEquals(3000, clock.millis())
+ }
+
+ @Test
+ fun testUtilization() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val sink = SimpleFlowSink(graph, 1.0f)
+ val source = SimpleFlowSource(graph, 2.0f, 0.5f)
+
+ graph.connect(source.output, sink.input)
+ advanceUntilIdle()
+
+ assertEquals(4000, clock.millis())
+ }
+
+ @Test
+ fun testFragments() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val sink = SimpleFlowSink(graph, 1.0f)
+ val trace = TraceFlowSource.Trace(
+ longArrayOf(1000, 2000, 3000, 4000),
+ floatArrayOf(1.0f, 0.5f, 2.0f, 1.0f),
+ 4
+ )
+ val source = TraceFlowSource(
+ graph,
+ trace
+ )
+
+ graph.connect(source.output, sink.input)
+ advanceUntilIdle()
+
+ assertEquals(4000, clock.millis())
+ }
+
+ @Test
+ fun benchmarkSink() {
+ val random = ThreadLocalRandom.current()
+ val traceSize = 10000000
+ val trace = TraceFlowSource.Trace(
+ LongArray(traceSize) { it * 1000L },
+ FloatArray(traceSize) { random.nextDouble(0.0, 4500.0).toFloat() },
+ traceSize
+ )
+
+ return runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+ val sink = SimpleFlowSink(graph, 4200.0f)
+ val source = TraceFlowSource(graph, trace)
+ graph.connect(source.output, sink.input)
+ }
+ }
+}