summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-21 22:32:05 +0200
committerGitHub <noreply@github.com>2022-10-21 22:32:05 +0200
commitfa7fdbb0126ea465130961dc37c4ef2d6feb36e9 (patch)
tree9cd46dd7970870b78990d6c35e8e2759d7cf5a13 /opendc-simulator/opendc-simulator-flow/src/main
parent29beb50018cf2ad87b252c6c080f8c5de4600349 (diff)
parent290e1fe14460d91e4703e55ac5f05dbe7b4505f7 (diff)
merge: Implement multi-flow stages in simulator (#110)
This pull request introduces the new `flow2` multi-flow simulator into the OpenDC codebase and adjust all existing modules to make use of this new simulator. The new simulator models flow as a network of components, which can each receive flow from (potentially) multiple other components. In the previous simulator, the framework itself supported only single flows between components and required re-implementation of many components to support multiplexing flows. Initial benchmarks show performance improvements in the range 2x–4x for large scale experiments such as the Capelin benchmarks. ## Implementation Notes :hammer_and_pick: * Add support for multi-flow stages * Support flow transformations * Add forwarding flow multiplexer * Expose metrics on FlowMultiplexer * Re-implement network sim using flow2 * Re-implement power sim using flow2 * Re-implement compute sim using flow2 * Optimize workload implementation of SimTrace * Remove old flow simulator * Add log4j-core dependency ## External Dependencies :four_leaf_clover: * N/A ## Breaking API Changes :warning: * Removal of the `org.opendc.simulator.flow` package. You should now use the new flow simulator located in `org.opendc.simulator.flow2`. * `PowerModel` interface is replaced by the `CpuPowerModel` interface. * `PowerDriver` interface is replaced by the `SimPsu` and `SimPsuFactory` interfaces. * Removal of `SimTraceWorkload`. Instead, create a workload from a `SimTrace` using `createWorkload(offset)`. * `ScalingGovernor` has been split in a `ScalingGovernor` and `ScalingGovernorFactory`. * All modules in `opendc-simulator` are now written in Java. This means that default parameters are not supported anymore for these modules.
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src/main')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java260
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraph.java63
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraphInternal.java93
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java303
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageLogic.java38
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageQueue.java109
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowTimerQueue.java208
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandler.java54
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandlers.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt)39
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java214
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Inlet.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt)19
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InvocationStack.java95
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandler.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt)47
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandlers.java53
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java224
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Outlet.java38
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java95
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexerFactory.java51
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java280
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java297
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/FlowSink.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt)16
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/SimpleFlowSink.java123
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/EmptyFlowSource.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt)67
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/FlowSource.java36
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java128
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java131
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java151
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransform.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt)29
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransformer.java124
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransforms.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt)50
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt131
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt57
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt95
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt264
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt75
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt160
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt67
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt44
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt436
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt119
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt218
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt200
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt124
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt60
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt177
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt811
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt66
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt77
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt67
49 files changed, 3295 insertions, 3388 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java
new file mode 100644
index 00000000..0ebb0da9
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java
@@ -0,0 +1,260 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.List;
+import kotlin.coroutines.ContinuationInterceptor;
+import kotlin.coroutines.CoroutineContext;
+import kotlinx.coroutines.Delay;
+
+/**
+ * A {@link FlowEngine} simulates a generic flow network.
+ * <p>
+ * The engine centralizes the scheduling logic of state updates of flow connections, allowing update propagation
+ * to happen more efficiently. and overall, reducing the work necessary to transition into a steady state.
+ */
+public final class FlowEngine implements Runnable {
+ /**
+ * The queue of {@link FlowStage} updates that are scheduled for immediate execution.
+ */
+ private final FlowStageQueue queue = new FlowStageQueue(256);
+
+ /**
+ * A priority queue containing the {@link FlowStage} updates to be scheduled in the future.
+ */
+ private final FlowTimerQueue timerQueue = new FlowTimerQueue(256);
+
+ /**
+ * The stack of engine invocations to occur in the future.
+ */
+ private final InvocationStack futureInvocations = new InvocationStack(256);
+
+ /**
+ * A flag to indicate that the engine is active.
+ */
+ private boolean active;
+
+ private final CoroutineContext coroutineContext;
+ private final Clock clock;
+ private final Delay delay;
+
+ /**
+ * Create a new {@link FlowEngine} instance using the specified {@link CoroutineContext} and {@link Clock}.
+ */
+ public static FlowEngine create(CoroutineContext coroutineContext, Clock clock) {
+ return new FlowEngine(coroutineContext, clock);
+ }
+
+ FlowEngine(CoroutineContext coroutineContext, Clock clock) {
+ this.coroutineContext = coroutineContext;
+ this.clock = clock;
+
+ CoroutineContext.Key<? extends ContinuationInterceptor> key = ContinuationInterceptor.Key;
+ this.delay = (Delay) coroutineContext.get(key);
+ }
+
+ /**
+ * Obtain the (virtual) {@link Clock} driving the simulation.
+ */
+ public Clock getClock() {
+ return clock;
+ }
+
+ /**
+ * Return a new {@link FlowGraph} that can be used to build a flow network.
+ */
+ public FlowGraph newGraph() {
+ return new RootGraph(this);
+ }
+
+ /**
+ * Enqueue the specified {@link FlowStage} to be updated immediately during the active engine cycle.
+ * <p>
+ * This method should be used when the state of a flow context is invalidated/interrupted and needs to be
+ * re-computed.
+ */
+ void scheduleImmediate(long now, FlowStage ctx) {
+ scheduleImmediateInContext(ctx);
+
+ // In-case the engine is already running in the call-stack, return immediately. The changes will be picked
+ // up by the active engine.
+ if (active) {
+ return;
+ }
+
+ trySchedule(futureInvocations, now, now);
+ }
+
+ /**
+ * Enqueue the specified {@link FlowStage} to be updated immediately during the active engine cycle.
+ * <p>
+ * This method should be used when the state of a flow context is invalidated/interrupted and needs to be
+ * re-computed.
+ * <p>
+ * This method should only be invoked while inside an engine cycle.
+ */
+ void scheduleImmediateInContext(FlowStage ctx) {
+ queue.add(ctx);
+ }
+
+ /**
+ * Enqueue the specified {@link FlowStage} to be updated at its updated deadline.
+ */
+ void scheduleDelayed(FlowStage ctx) {
+ scheduleDelayedInContext(ctx);
+
+ // In-case the engine is already running in the call-stack, return immediately. The changes will be picked
+ // up by the active engine.
+ if (active) {
+ return;
+ }
+
+ long deadline = timerQueue.peekDeadline();
+ if (deadline != Long.MAX_VALUE) {
+ trySchedule(futureInvocations, clock.millis(), deadline);
+ }
+ }
+
+ /**
+ * Enqueue the specified {@link FlowStage} to be updated at its updated deadline.
+ * <p>
+ * This method should only be invoked while inside an engine cycle.
+ */
+ void scheduleDelayedInContext(FlowStage ctx) {
+ FlowTimerQueue timerQueue = this.timerQueue;
+ timerQueue.enqueue(ctx);
+ }
+
+ /**
+ * Run all the enqueued actions for the specified timestamp (<code>now</code>).
+ */
+ private void doRunEngine(long now) {
+ final FlowStageQueue queue = this.queue;
+ final FlowTimerQueue timerQueue = this.timerQueue;
+
+ try {
+ // Mark the engine as active to prevent concurrent calls to this method
+ active = true;
+
+ // Execute all scheduled updates at current timestamp
+ while (true) {
+ final FlowStage ctx = timerQueue.poll(now);
+ if (ctx == null) {
+ break;
+ }
+
+ ctx.onUpdate(now);
+ }
+
+ // Execute all immediate updates
+ while (true) {
+ final FlowStage ctx = queue.poll();
+ if (ctx == null) {
+ break;
+ }
+
+ ctx.onUpdate(now);
+ }
+ } finally {
+ active = false;
+ }
+
+ // Schedule an engine invocation for the next update to occur.
+ long headDeadline = timerQueue.peekDeadline();
+ if (headDeadline != Long.MAX_VALUE && headDeadline >= now) {
+ trySchedule(futureInvocations, now, headDeadline);
+ }
+ }
+
+ @Override
+ public void run() {
+ doRunEngine(futureInvocations.poll());
+ }
+
+ /**
+ * Try to schedule an engine invocation at the specified [target].
+ *
+ * @param scheduled The queue of scheduled invocations.
+ * @param now The current virtual timestamp.
+ * @param target The virtual timestamp at which the engine invocation should happen.
+ */
+ private void trySchedule(InvocationStack scheduled, long now, long target) {
+ // Only schedule a new scheduler invocation in case the target is earlier than all other pending
+ // scheduler invocations
+ if (scheduled.tryAdd(target)) {
+ delay.invokeOnTimeout(target - now, this, coroutineContext);
+ }
+ }
+
+ /**
+ * Internal implementation of a root {@link FlowGraph}.
+ */
+ private static final class RootGraph implements FlowGraphInternal {
+ private final FlowEngine engine;
+ private final List<FlowStage> stages = new ArrayList<>();
+
+ public RootGraph(FlowEngine engine) {
+ this.engine = engine;
+ }
+
+ @Override
+ public FlowEngine getEngine() {
+ return engine;
+ }
+
+ @Override
+ public FlowStage newStage(FlowStageLogic logic) {
+ final FlowEngine engine = this.engine;
+ final FlowStage stage = new FlowStage(this, logic);
+ stages.add(stage);
+ long now = engine.getClock().millis();
+ stage.invalidate(now);
+ return stage;
+ }
+
+ @Override
+ public void connect(Outlet outlet, Inlet inlet) {
+ FlowGraphInternal.connect(this, outlet, inlet);
+ }
+
+ @Override
+ public void disconnect(Outlet outlet) {
+ FlowGraphInternal.disconnect(this, outlet);
+ }
+
+ @Override
+ public void disconnect(Inlet inlet) {
+ FlowGraphInternal.disconnect(this, inlet);
+ }
+
+ /**
+ * Internal method to remove the specified {@link FlowStage} from the graph.
+ */
+ @Override
+ public void detach(FlowStage stage) {
+ stages.remove(stage);
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraph.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraph.java
new file mode 100644
index 00000000..f45be6cd
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraph.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+/**
+ * A representation of a flow network. A flow network is a directed graph where each edge has a capacity and receives an
+ * amount of flow that cannot exceed the edge's capacity.
+ */
+public interface FlowGraph {
+ /**
+ * Return the {@link FlowEngine} driving the simulation of the graph.
+ */
+ FlowEngine getEngine();
+
+ /**
+ * Create a new {@link FlowStage} representing a node in the flow network.
+ *
+ * @param logic The logic for handling the events of the stage.
+ */
+ FlowStage newStage(FlowStageLogic logic);
+
+ /**
+ * Add an edge between the specified outlet port and inlet port in this graph.
+ *
+ * @param outlet The outlet of the source from which the flow originates.
+ * @param inlet The inlet of the sink that should receive the flow.
+ */
+ void connect(Outlet outlet, Inlet inlet);
+
+ /**
+ * Disconnect the specified {@link Outlet} (if connected).
+ *
+ * @param outlet The outlet to disconnect.
+ */
+ void disconnect(Outlet outlet);
+
+ /**
+ * Disconnect the specified {@link Inlet} (if connected).
+ *
+ * @param inlet The inlet to disconnect.
+ */
+ void disconnect(Inlet inlet);
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraphInternal.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraphInternal.java
new file mode 100644
index 00000000..0f608b60
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraphInternal.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+/**
+ * Interface implemented by {@link FlowGraph} implementations.
+ */
+interface FlowGraphInternal extends FlowGraph {
+ /**
+ * Internal method to remove the specified {@link FlowStage} from the graph.
+ */
+ void detach(FlowStage stage);
+
+ /**
+ * Helper method to connect an outlet to an inlet.
+ */
+ static void connect(FlowGraph graph, Outlet outlet, Inlet inlet) {
+ if (!(outlet instanceof OutPort) || !(inlet instanceof InPort)) {
+ throw new IllegalArgumentException("Invalid outlet or inlet passed to graph");
+ }
+
+ InPort inPort = (InPort) inlet;
+ OutPort outPort = (OutPort) outlet;
+
+ if (!graph.equals(outPort.getGraph()) || !graph.equals(inPort.getGraph())) {
+ throw new IllegalArgumentException("Outlet or inlet does not belong to graph");
+ } else if (outPort.input != null || inPort.output != null) {
+ throw new IllegalStateException("Inlet or outlet already connected");
+ }
+
+ outPort.input = inPort;
+ inPort.output = outPort;
+
+ inPort.connect();
+ outPort.connect();
+ }
+
+ /**
+ * Helper method to disconnect an outlet.
+ */
+ static void disconnect(FlowGraph graph, Outlet outlet) {
+ if (!(outlet instanceof OutPort)) {
+ throw new IllegalArgumentException("Invalid outlet passed to graph");
+ }
+
+ OutPort outPort = (OutPort) outlet;
+
+ if (!graph.equals(outPort.getGraph())) {
+ throw new IllegalArgumentException("Outlet or inlet does not belong to graph");
+ }
+
+ outPort.cancel(null);
+ outPort.complete();
+ }
+
+ /**
+ * Helper method to disconnect an inlet.
+ */
+ static void disconnect(FlowGraph graph, Inlet inlet) {
+ if (!(inlet instanceof InPort)) {
+ throw new IllegalArgumentException("Invalid outlet passed to graph");
+ }
+
+ InPort inPort = (InPort) inlet;
+
+ if (!graph.equals(inPort.getGraph())) {
+ throw new IllegalArgumentException("Outlet or inlet does not belong to graph");
+ }
+
+ inPort.finish(null);
+ inPort.cancel(null);
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java
new file mode 100644
index 00000000..4d098043
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java
@@ -0,0 +1,303 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+import java.time.Clock;
+import java.util.HashMap;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link FlowStage} represents a node in a {@link FlowGraph}.
+ */
+public final class FlowStage {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlowStage.class);
+
+ /**
+ * States of the flow stage.
+ */
+ private static final int STAGE_PENDING = 0; // Stage is pending to be started
+
+ private static final int STAGE_ACTIVE = 1; // Stage is actively running
+ private static final int STAGE_CLOSED = 2; // Stage is closed
+ private static final int STAGE_STATE = 0b11; // Mask for accessing the state of the flow stage
+
+ /**
+ * Flags of the flow connection
+ */
+ private static final int STAGE_INVALIDATE = 1 << 2; // The stage is invalidated
+
+ private static final int STAGE_CLOSE = 1 << 3; // The stage should be closed
+ private static final int STAGE_UPDATE_ACTIVE = 1 << 4; // An update for the connection is active
+ private static final int STAGE_UPDATE_PENDING = 1 << 5; // An (immediate) update of the connection is pending
+
+ /**
+ * The flags representing the state and pending actions for the stage.
+ */
+ private int flags = STAGE_PENDING;
+
+ /**
+ * The deadline of the stage after which an update should run.
+ */
+ long deadline = Long.MAX_VALUE;
+
+ /**
+ * The index of the timer in the {@link FlowTimerQueue}.
+ */
+ int timerIndex = -1;
+
+ final Clock clock;
+ private final FlowStageLogic logic;
+ final FlowGraphInternal parentGraph;
+ private final FlowEngine engine;
+
+ private final Map<String, InPort> inlets = new HashMap<>();
+ private final Map<String, OutPort> outlets = new HashMap<>();
+ private int nextInlet = 0;
+ private int nextOutlet = 0;
+
+ /**
+ * Construct a new {@link FlowStage} instance.
+ *
+ * @param parentGraph The {@link FlowGraph} this stage belongs to.
+ * @param logic The logic of the stage.
+ */
+ FlowStage(FlowGraphInternal parentGraph, FlowStageLogic logic) {
+ this.parentGraph = parentGraph;
+ this.logic = logic;
+ this.engine = parentGraph.getEngine();
+ this.clock = engine.getClock();
+ }
+
+ /**
+ * Return the {@link FlowGraph} to which this stage belongs.
+ */
+ public FlowGraph getGraph() {
+ return parentGraph;
+ }
+
+ /**
+ * Return the {@link Inlet} (an in-going edge) with the specified <code>name</code> for this {@link FlowStage}.
+ * If an inlet with that name does not exist, a new one is allocated for the stage.
+ *
+ * @param name The name of the inlet.
+ * @return The {@link InPort} representing an {@link Inlet} with the specified <code>name</code>.
+ */
+ public InPort getInlet(String name) {
+ return inlets.computeIfAbsent(name, (key) -> new InPort(this, key, nextInlet++));
+ }
+
+ /**
+ * Return the {@link Outlet} (an out-going edge) with the specified <code>name</code> for this {@link FlowStage}.
+ * If an outlet with that name does not exist, a new one is allocated for the stage.
+ *
+ * @param name The name of the outlet.
+ * @return The {@link OutPort} representing an {@link Outlet} with the specified <code>name</code>.
+ */
+ public OutPort getOutlet(String name) {
+ return outlets.computeIfAbsent(name, (key) -> new OutPort(this, key, nextOutlet++));
+ }
+
+ /**
+ * Return the current deadline of the {@link FlowStage}'s timer (in milliseconds after epoch).
+ */
+ public long getDeadline() {
+ return deadline;
+ }
+
+ /**
+ * Set the deadline of the {@link FlowStage}'s timer.
+ *
+ * @param deadline The new deadline (in milliseconds after epoch) when the stage should be interrupted.
+ */
+ public void setDeadline(long deadline) {
+ this.deadline = deadline;
+
+ if ((flags & STAGE_UPDATE_ACTIVE) == 0) {
+ // Update the timer queue with the new deadline
+ engine.scheduleDelayed(this);
+ }
+ }
+
+ /**
+ * Invalidate the {@link FlowStage} forcing the stage to update.
+ */
+ public void invalidate() {
+ int flags = this.flags;
+
+ if ((flags & STAGE_UPDATE_ACTIVE) == 0) {
+ scheduleImmediate(clock.millis(), flags | STAGE_INVALIDATE);
+ }
+ }
+
+ /**
+ * Close the {@link FlowStage} and disconnect all inlets and outlets.
+ */
+ public void close() {
+ int flags = this.flags;
+
+ if ((flags & STAGE_STATE) == STAGE_CLOSED) {
+ return;
+ }
+
+ // Toggle the close bit. In case no update is active, schedule a new update.
+ if ((flags & STAGE_UPDATE_ACTIVE) != 0) {
+ this.flags = flags | STAGE_CLOSE;
+ } else {
+ scheduleImmediate(clock.millis(), flags | STAGE_CLOSE);
+ }
+ }
+
+ /**
+ * Update the state of the flow stage.
+ *
+ * @param now The current virtual timestamp.
+ */
+ void onUpdate(long now) {
+ int flags = this.flags;
+ int state = flags & STAGE_STATE;
+
+ if (state == STAGE_ACTIVE) {
+ doUpdate(now, flags);
+ } else if (state == STAGE_PENDING) {
+ doStart(now, flags);
+ }
+ }
+
+ /**
+ * Invalidate the {@link FlowStage} forcing the stage to update.
+ *
+ * <p>
+ * This method is similar to {@link #invalidate()}, but allows the user to manually pass the current timestamp to
+ * prevent having to re-query the clock. This method should not be called during an update.
+ */
+ void invalidate(long now) {
+ scheduleImmediate(now, flags | STAGE_INVALIDATE);
+ }
+
+ /**
+ * Schedule an immediate update for this stage.
+ */
+ private void scheduleImmediate(long now, int flags) {
+ // In case an immediate update is already scheduled, no need to do anything
+ if ((flags & STAGE_UPDATE_PENDING) != 0) {
+ this.flags = flags;
+ return;
+ }
+
+ // Mark the stage that there is an update pending
+ this.flags = flags | STAGE_UPDATE_PENDING;
+
+ engine.scheduleImmediate(now, this);
+ }
+
+ /**
+ * Start the stage.
+ */
+ private void doStart(long now, int flags) {
+ // Update state before calling into the outside world, so it observes a consistent state
+ flags = flags | STAGE_ACTIVE | STAGE_UPDATE_ACTIVE;
+
+ doUpdate(now, flags);
+ }
+
+ /**
+ * Update the state of the stage.
+ */
+ private void doUpdate(long now, int flags) {
+ long deadline = this.deadline;
+ long newDeadline = deadline;
+
+ // Update the stage if:
+ // (1) the timer of the stage has expired.
+ // (2) one of the input ports is pushed,
+ // (3) one of the output ports is pulled,
+ if ((flags & STAGE_INVALIDATE) != 0 || deadline == now) {
+ // Update state before calling into the outside world, so it observes a consistent state
+ this.flags = (flags & ~STAGE_INVALIDATE) | STAGE_UPDATE_ACTIVE;
+
+ try {
+ newDeadline = logic.onUpdate(this, now);
+
+ // IMPORTANT: Re-fetch the flags after the callback might have changed those
+ flags = this.flags;
+ } catch (Exception e) {
+ doFail(e);
+ }
+ }
+
+ // Check whether the stage is marked as closing.
+ if ((flags & STAGE_CLOSE) != 0) {
+ doClose(flags, null);
+
+ // IMPORTANT: Re-fetch the flags after the callback might have changed those
+ flags = this.flags;
+ }
+
+ // Indicate that no update is active anymore and flush the flags
+ this.flags = flags & ~(STAGE_UPDATE_ACTIVE | STAGE_UPDATE_PENDING);
+ this.deadline = newDeadline;
+
+ // Update the timer queue with the new deadline
+ engine.scheduleDelayedInContext(this);
+ }
+
+ /**
+ * This method is invoked when an uncaught exception is caught by the engine. When this happens, the
+ * {@link FlowStageLogic} "fails" and disconnects all its inputs and outputs.
+ */
+ void doFail(Throwable cause) {
+ LOGGER.warn("Uncaught exception (closing stage)", cause);
+
+ doClose(flags, cause);
+ }
+
+ /**
+ * This method is invoked when the {@link FlowStageLogic} exits successfully or due to failure.
+ */
+ private void doClose(int flags, Throwable cause) {
+ // Mark the stage as closed
+ this.flags = flags & ~(STAGE_STATE | STAGE_INVALIDATE | STAGE_CLOSE) | STAGE_CLOSED;
+
+ // Remove stage from parent graph
+ parentGraph.detach(this);
+
+ // Remove stage from the timer queue
+ setDeadline(Long.MAX_VALUE);
+
+ // Cancel all input ports
+ for (InPort port : inlets.values()) {
+ if (port != null) {
+ port.cancel(cause);
+ }
+ }
+
+ // Cancel all output ports
+ for (OutPort port : outlets.values()) {
+ if (port != null) {
+ port.fail(cause);
+ }
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageLogic.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageLogic.java
new file mode 100644
index 00000000..70986a35
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageLogic.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+/**
+ * The {@link FlowStageLogic} interface is responsible for describing the behaviour of a {@link FlowStage} via
+ * out-going flows based on its potential inputs.
+ */
+public interface FlowStageLogic {
+ /**
+ * This method is invoked when the one of the stage's inlets or outlets is invalidated.
+ *
+ * @param ctx The context in which the stage runs.
+ * @param now The virtual timestamp in milliseconds after epoch at which the update is occurring.
+ * @return The next deadline for the stage.
+ */
+ long onUpdate(FlowStage ctx, long now);
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageQueue.java
new file mode 100644
index 00000000..56ec7702
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageQueue.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+
+/**
+ * A specialized {@link ArrayDeque} implementation that contains the {@link FlowStageLogic}s
+ * that have been updated during the engine cycle and should converge.
+ * <p>
+ * By using a specialized class, we reduce the overhead caused by type-erasure.
+ */
+final class FlowStageQueue {
+ /**
+ * The array of elements in the queue.
+ */
+ private FlowStage[] elements;
+
+ private int head = 0;
+ private int tail = 0;
+
+ public FlowStageQueue(int initialCapacity) {
+ elements = new FlowStage[initialCapacity];
+ }
+
+ /**
+ * Add the specified context to the queue.
+ */
+ void add(FlowStage ctx) {
+ final FlowStage[] es = elements;
+ int tail = this.tail;
+
+ es[tail] = ctx;
+
+ tail = inc(tail, es.length);
+ this.tail = tail;
+
+ if (head == tail) {
+ doubleCapacity();
+ }
+ }
+
+ /**
+ * Remove a {@link FlowStage} from the queue or <code>null</code> if the queue is empty.
+ */
+ FlowStage poll() {
+ final FlowStage[] es = elements;
+ int head = this.head;
+ FlowStage ctx = es[head];
+
+ if (ctx != null) {
+ es[head] = null;
+ this.head = inc(head, es.length);
+ }
+
+ return ctx;
+ }
+
+ /**
+ * Doubles the capacity of this deque
+ */
+ private void doubleCapacity() {
+ int oldCapacity = elements.length;
+ int newCapacity = oldCapacity + (oldCapacity >> 1);
+ if (newCapacity < 0) {
+ throw new IllegalStateException("Sorry, deque too big");
+ }
+
+ final FlowStage[] es = elements = Arrays.copyOf(elements, newCapacity);
+
+ // Exceptionally, here tail == head needs to be disambiguated
+ if (tail < head || (tail == head && es[head] != null)) {
+ // wrap around; slide first leg forward to end of array
+ int newSpace = newCapacity - oldCapacity;
+ System.arraycopy(es, head, es, head + newSpace, oldCapacity - head);
+ for (int i = head, to = (head += newSpace); i < to; i++) es[i] = null;
+ }
+ }
+
+ /**
+ * Circularly increments i, mod modulus.
+ * Precondition and postcondition: 0 <= i < modulus.
+ */
+ private static int inc(int i, int modulus) {
+ if (++i >= modulus) i = 0;
+ return i;
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowTimerQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowTimerQueue.java
new file mode 100644
index 00000000..4b746202
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowTimerQueue.java
@@ -0,0 +1,208 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+import java.util.Arrays;
+
+/**
+ * A specialized priority queue for timers of {@link FlowStageLogic}s.
+ * <p>
+ * By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation
+ * being generic.
+ */
+final class FlowTimerQueue {
+ /**
+ * Array representation of binary heap of {@link FlowStage} instances.
+ */
+ private FlowStage[] queue;
+
+ /**
+ * The number of elements in the priority queue.
+ */
+ private int size = 0;
+
+ /**
+ * Construct a {@link FlowTimerQueue} with the specified initial capacity.
+ *
+ * @param initialCapacity The initial capacity of the queue.
+ */
+ public FlowTimerQueue(int initialCapacity) {
+ this.queue = new FlowStage[initialCapacity];
+ }
+
+ /**
+ * Enqueue a timer for the specified context or update the existing timer.
+ */
+ void enqueue(FlowStage ctx) {
+ FlowStage[] es = queue;
+ int k = ctx.timerIndex;
+
+ if (ctx.deadline != Long.MAX_VALUE) {
+ if (k >= 0) {
+ update(es, ctx, k);
+ } else {
+ add(es, ctx);
+ }
+ } else if (k >= 0) {
+ delete(es, k);
+ }
+ }
+
+ /**
+ * Retrieve the head of the queue if its deadline does not exceed <code>now</code>.
+ *
+ * @param now The timestamp that the deadline of the head of the queue should not exceed.
+ * @return The head of the queue if its deadline does not exceed <code>now</code>, otherwise <code>null</code>.
+ */
+ FlowStage poll(long now) {
+ int size = this.size;
+ if (size == 0) {
+ return null;
+ }
+
+ final FlowStage[] es = queue;
+ final FlowStage head = es[0];
+
+ if (now < head.deadline) {
+ return null;
+ }
+
+ int n = size - 1;
+ this.size = n;
+ final FlowStage next = es[n];
+ es[n] = null; // Clear the last element of the queue
+
+ if (n > 0) {
+ siftDown(0, next, es, n);
+ }
+
+ head.timerIndex = -1;
+ return head;
+ }
+
+ /**
+ * Find the earliest deadline in the queue.
+ */
+ long peekDeadline() {
+ if (size > 0) {
+ return queue[0].deadline;
+ }
+
+ return Long.MAX_VALUE;
+ }
+
+ /**
+ * Add a new entry to the queue.
+ */
+ private void add(FlowStage[] es, FlowStage ctx) {
+ int i = size;
+
+ if (i >= es.length) {
+ // Re-fetch the resized array
+ es = grow();
+ }
+
+ siftUp(i, ctx, es);
+
+ size = i + 1;
+ }
+
+ /**
+ * Update the deadline of an existing entry in the queue.
+ */
+ private void update(FlowStage[] es, FlowStage ctx, int k) {
+ if (k > 0) {
+ int parent = (k - 1) >>> 1;
+ if (es[parent].deadline > ctx.deadline) {
+ siftUp(k, ctx, es);
+ return;
+ }
+ }
+
+ siftDown(k, ctx, es, size);
+ }
+
+ /**
+ * Deadline an entry from the queue.
+ */
+ private void delete(FlowStage[] es, int k) {
+ int s = --size;
+ if (s == k) {
+ es[k] = null; // Element is last in the queue
+ } else {
+ FlowStage moved = es[s];
+ es[s] = null;
+
+ siftDown(k, moved, es, s);
+
+ if (es[k] == moved) {
+ siftUp(k, moved, es);
+ }
+ }
+ }
+
+ /**
+ * Increases the capacity of the array.
+ */
+ private FlowStage[] grow() {
+ FlowStage[] queue = this.queue;
+ int oldCapacity = queue.length;
+ int newCapacity = oldCapacity + (oldCapacity >> 1);
+
+ queue = Arrays.copyOf(queue, newCapacity);
+ this.queue = queue;
+ return queue;
+ }
+
+ private static void siftUp(int k, FlowStage key, FlowStage[] es) {
+ while (k > 0) {
+ int parent = (k - 1) >>> 1;
+ FlowStage e = es[parent];
+ if (key.deadline >= e.deadline) break;
+ es[k] = e;
+ e.timerIndex = k;
+ k = parent;
+ }
+ es[k] = key;
+ key.timerIndex = k;
+ }
+
+ private static void siftDown(int k, FlowStage key, FlowStage[] es, int n) {
+ int half = n >>> 1; // loop while a non-leaf
+ while (k < half) {
+ int child = (k << 1) + 1; // assume left child is least
+ FlowStage c = es[child];
+ int right = child + 1;
+ if (right < n && c.deadline > es[right].deadline) c = es[child = right];
+
+ if (key.deadline <= c.deadline) break;
+
+ es[k] = c;
+ c.timerIndex = k;
+ k = child;
+ }
+
+ es[k] = key;
+ key.timerIndex = k;
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandler.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandler.java
new file mode 100644
index 00000000..839b01db
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandler.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+/**
+ * Collection of callbacks for the input port (a {@link InPort}) of a {@link FlowStageLogic}.
+ */
+public interface InHandler {
+ /**
+ * Return the actual flow rate over the input port.
+ *
+ * @param port The input port to which the flow was pushed.
+ * @return The actual flow rate over the port.
+ */
+ default float getRate(InPort port) {
+ return Math.min(port.getDemand(), port.getCapacity());
+ }
+
+ /**
+ * This method is invoked when another {@link FlowStageLogic} changes the rate of flow to the specified inlet.
+ *
+ * @param port The input port to which the flow was pushed.
+ * @param demand The rate of flow the output attempted to push to the port.
+ */
+ void onPush(InPort port, float demand);
+
+ /**
+ * This method is invoked when the input port is finished.
+ *
+ * @param port The input port that has finished.
+ * @param cause The cause of the input port being finished or <code>null</code> if the port completed successfully.
+ */
+ void onUpstreamFinish(InPort port, Throwable cause);
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandlers.java
index b3191ad3..9d5b4bef 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandlers.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,33 +20,34 @@
* SOFTWARE.
*/
-package org.opendc.simulator.flow.source
+package org.opendc.simulator.flow2;
/**
- * The [FlowSourceBarrier] is a barrier that allows multiple sources to wait for a select number of other sources to
- * finish a pull, before proceeding its operation.
+ * A collection of common {@link InHandler} implementations.
*/
-public class FlowSourceBarrier(public val parties: Int) {
- private var counter = 0
+public class InHandlers {
+ /**
+ * Prevent construction of this class.
+ */
+ private InHandlers() {}
/**
- * Enter the barrier and determine whether the caller is the last to reach the barrier.
- *
- * @return `true` if the caller is the last to reach the barrier, `false` otherwise.
+ * Return an {@link InHandler} that does nothing.
*/
- public fun enter(): Boolean {
- val last = ++counter == parties
- if (last) {
- counter = 0
- return true
- }
- return false
+ public static InHandler noop() {
+ return NoopInHandler.INSTANCE;
}
/**
- * Reset the barrier.
+ * No-op implementation of {@link InHandler}.
*/
- public fun reset() {
- counter = 0
+ private static final class NoopInHandler implements InHandler {
+ public static final InHandler INSTANCE = new NoopInHandler();
+
+ @Override
+ public void onPush(InPort port, float demand) {}
+
+ @Override
+ public void onUpstreamFinish(InPort port, Throwable cause) {}
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java
new file mode 100644
index 00000000..fba12aaf
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java
@@ -0,0 +1,214 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+import java.time.Clock;
+import java.util.Objects;
+
+/**
+ * A port that consumes a flow.
+ * <p>
+ * Input ports are represented as in-going edges in the flow graph.
+ */
+public final class InPort implements Inlet {
+ private final int id;
+
+ private float capacity;
+ private float demand;
+
+ private boolean mask;
+
+ OutPort output;
+ private InHandler handler = InHandlers.noop();
+ private final Clock clock;
+ private final String name;
+ private final FlowStage stage;
+
+ InPort(FlowStage stage, String name, int id) {
+ this.name = name;
+ this.id = id;
+ this.stage = stage;
+ this.clock = stage.clock;
+ }
+
+ @Override
+ public FlowGraph getGraph() {
+ return stage.parentGraph;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Return the identifier of the {@link InPort} with respect to its stage.
+ */
+ public int getId() {
+ return id;
+ }
+
+ /**
+ * Return the current capacity of the input port.
+ */
+ public float getCapacity() {
+ return capacity;
+ }
+
+ /**
+ * Return the current demand of flow of the input port.
+ */
+ public float getDemand() {
+ return demand;
+ }
+
+ /**
+ * Return the current rate of flow of the input port.
+ */
+ public float getRate() {
+ return handler.getRate(this);
+ }
+
+ /**
+ * Pull the flow with the specified <code>capacity</code> from the input port.
+ *
+ * @param capacity The maximum throughput that the stage can receive from the input port.
+ */
+ public void pull(float capacity) {
+ this.capacity = capacity;
+
+ OutPort output = this.output;
+ if (output != null) {
+ output.pull(capacity);
+ }
+ }
+
+ /**
+ * Return the current {@link InHandler} of the input port.
+ */
+ public InHandler getHandler() {
+ return handler;
+ }
+
+ /**
+ * Set the {@link InHandler} of the input port.
+ */
+ public void setHandler(InHandler handler) {
+ this.handler = handler;
+ }
+
+ /**
+ * Return the mask of this port.
+ * <p>
+ * Stages ignore events originating from masked ports.
+ */
+ public boolean getMask() {
+ return mask;
+ }
+
+ /**
+ * (Un)mask the port.
+ */
+ public void setMask(boolean mask) {
+ this.mask = mask;
+ }
+
+ /**
+ * Disconnect the input port from its (potentially) connected outlet.
+ * <p>
+ * The inlet can still be used and re-connected to another outlet.
+ *
+ * @param cause The cause for disconnecting the port or <code>null</code> when no more flow is needed.
+ */
+ public void cancel(Throwable cause) {
+ demand = 0.f;
+
+ OutPort output = this.output;
+ if (output != null) {
+ this.output = null;
+ output.input = null;
+ output.cancel(cause);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ InPort port = (InPort) o;
+ return stage.equals(port.stage) && name.equals(port.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(stage.parentGraph, name);
+ }
+
+ /**
+ * This method is invoked when the inlet is connected to an outlet.
+ */
+ void connect() {
+ OutPort output = this.output;
+ output.pull(capacity);
+ }
+
+ /**
+ * Push a flow from an outlet to this inlet.
+ *
+ * @param demand The rate of flow to push.
+ */
+ void push(float demand) {
+ // No-op when the rate is unchanged
+ if (this.demand == demand) {
+ return;
+ }
+
+ try {
+ handler.onPush(this, demand);
+ this.demand = demand;
+
+ if (!mask) {
+ stage.invalidate(clock.millis());
+ }
+ } catch (Exception e) {
+ stage.doFail(e);
+ }
+ }
+
+ /**
+ * This method is invoked by the connected {@link OutPort} when it finishes.
+ */
+ void finish(Throwable cause) {
+ try {
+ long now = clock.millis();
+ handler.onUpstreamFinish(this, cause);
+ this.demand = 0.f;
+
+ if (!mask) {
+ stage.invalidate(now);
+ }
+ } catch (Exception e) {
+ stage.doFail(e);
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Inlet.java
index 62cb10d1..4a9ea6a5 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Inlet.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,16 +20,19 @@
* SOFTWARE.
*/
-package org.opendc.simulator.flow
+package org.opendc.simulator.flow2;
/**
- * A listener interface for when a flow stage has converged into a steady-state.
+ * An in-going edge in a {@link FlowGraph}.
*/
-public interface FlowConvergenceListener {
+public interface Inlet {
/**
- * This method is invoked when the system has converged to a steady-state.
- *
- * @param now The timestamp at which the system converged.
+ * Return the {@link FlowGraph} to which the inlet is exposed.
*/
- public fun onConverge(now: Long) {}
+ FlowGraph getGraph();
+
+ /**
+ * Return the name of the inlet.
+ */
+ String getName();
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InvocationStack.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InvocationStack.java
new file mode 100644
index 00000000..a5b5114b
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InvocationStack.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+import java.util.Arrays;
+
+/**
+ * A specialized monotonic stack implementation for tracking the scheduled engine invocations.
+ * <p>
+ * By using a specialized class, we reduce the overhead caused by type-erasure.
+ */
+final class InvocationStack {
+ /**
+ * The array of elements in the stack.
+ */
+ private long[] elements;
+
+ private int head = -1;
+
+ public InvocationStack(int initialCapacity) {
+ elements = new long[initialCapacity];
+ Arrays.fill(elements, Long.MIN_VALUE);
+ }
+
+ /**
+ * Try to add the specified invocation to the monotonic stack.
+ *
+ * @param invocation The timestamp of the invocation.
+ * @return <code>true</code> if the invocation was added, <code>false</code> otherwise.
+ */
+ boolean tryAdd(long invocation) {
+ final long[] es = elements;
+ int head = this.head;
+
+ if (head < 0 || es[head] > invocation) {
+ es[head + 1] = invocation;
+ this.head = head + 1;
+
+ if (head + 2 == es.length) {
+ doubleCapacity();
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Remove the head invocation from the stack or return {@link Long#MAX_VALUE} if the stack is empty.
+ */
+ long poll() {
+ final long[] es = elements;
+ int head = this.head--;
+
+ if (head >= 0) {
+ return es[head];
+ }
+
+ return Long.MAX_VALUE;
+ }
+
+ /**
+ * Doubles the capacity of this deque
+ */
+ private void doubleCapacity() {
+ int oldCapacity = elements.length;
+ int newCapacity = oldCapacity + (oldCapacity >> 1);
+ if (newCapacity < 0) {
+ throw new IllegalStateException("Sorry, deque too big");
+ }
+
+ elements = Arrays.copyOf(elements, newCapacity);
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandler.java
index 98922ab3..723c6d6b 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandler.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,43 +20,28 @@
* SOFTWARE.
*/
-package org.opendc.simulator.flow
+package org.opendc.simulator.flow2;
/**
- * A controllable [FlowConnection].
- *
- * This interface is used by [FlowConsumer]s to control the connection between it and the source.
+ * Collection of callbacks for the output port (a {@link OutPort}) of a {@link FlowStageLogic}.
*/
-public interface FlowConsumerContext : FlowConnection {
- /**
- * The deadline of the source.
- */
- public val deadline: Long
-
- /**
- * The capacity of the connection.
- */
- public override var capacity: Double
-
+public interface OutHandler {
/**
- * A flag to control whether [FlowConsumerLogic.onConverge] should be invoked for the consumer.
- */
- public var shouldConsumerConverge: Boolean
-
- /**
- * A flag to control whether the timers for the [FlowSource] should be enabled.
- */
- public var enableTimers: Boolean
-
- /**
- * Start the flow over the connection.
+ * This method is invoked when another {@link FlowStageLogic} changes the capacity of the outlet.
+ *
+ * @param port The output port of which the capacity was changed.
+ * @param capacity The new capacity of the outlet.
*/
- public fun start()
+ void onPull(OutPort port, float capacity);
/**
- * Synchronously pull the source of the connection.
+ * This method is invoked when the output port no longer accepts any flow.
+ * <p>
+ * After this callback no other callbacks will be called for this port.
*
- * @param now The timestamp at which the connection is pulled.
+ * @param port The outlet that no longer accepts any flow.
+ * @param cause The cause of the output port no longer accepting any flow or <code>null</code> if the port closed
+ * successfully.
*/
- public fun pullSync(now: Long)
+ void onDownstreamFinish(OutPort port, Throwable cause);
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandlers.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandlers.java
new file mode 100644
index 00000000..8fbfda0d
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandlers.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+/**
+ * A collection of common {@link OutHandler} implementations.
+ */
+public class OutHandlers {
+ /**
+ * Prevent construction of this class.
+ */
+ private OutHandlers() {}
+
+ /**
+ * Return an {@link OutHandler} that does nothing.
+ */
+ public static OutHandler noop() {
+ return NoopOutHandler.INSTANCE;
+ }
+
+ /**
+ * No-op implementation of {@link OutHandler}.
+ */
+ private static final class NoopOutHandler implements OutHandler {
+ public static final OutHandler INSTANCE = new NoopOutHandler();
+
+ @Override
+ public void onPull(OutPort port, float capacity) {}
+
+ @Override
+ public void onDownstreamFinish(OutPort port, Throwable cause) {}
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java
new file mode 100644
index 00000000..332296a0
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java
@@ -0,0 +1,224 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+import java.time.Clock;
+import java.util.Objects;
+
+/**
+ * A port that outputs a flow.
+ * <p>
+ * Output ports are represented as out-going edges in the flow graph.
+ */
+public final class OutPort implements Outlet {
+ private final int id;
+
+ private float capacity;
+ private float demand;
+
+ private boolean mask;
+
+ InPort input;
+ private OutHandler handler = OutHandlers.noop();
+ private final String name;
+ private final FlowStage stage;
+ private final Clock clock;
+
+ OutPort(FlowStage stage, String name, int id) {
+ this.name = name;
+ this.id = id;
+ this.stage = stage;
+ this.clock = stage.clock;
+ }
+
+ @Override
+ public FlowGraph getGraph() {
+ return stage.parentGraph;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Return the identifier of the {@link OutPort} with respect to its stage.
+ */
+ public int getId() {
+ return id;
+ }
+
+ /**
+ * Return the capacity of the output port.
+ */
+ public float getCapacity() {
+ return capacity;
+ }
+
+ /**
+ * Return the current demand of flow of the output port.
+ */
+ public float getDemand() {
+ return demand;
+ }
+
+ /**
+ * Return the current rate of flow of the input port.
+ */
+ public float getRate() {
+ InPort input = this.input;
+ if (input != null) {
+ return input.getRate();
+ }
+
+ return 0.f;
+ }
+
+ /**
+ * Return the current {@link OutHandler} of the output port.
+ */
+ public OutHandler getHandler() {
+ return handler;
+ }
+
+ /**
+ * Set the {@link OutHandler} of the output port.
+ */
+ public void setHandler(OutHandler handler) {
+ this.handler = handler;
+ }
+
+ /**
+ * Return the mask of this port.
+ * <p>
+ * Stages ignore events originating from masked ports.
+ */
+ public boolean getMask() {
+ return mask;
+ }
+
+ /**
+ * (Un)mask the port.
+ */
+ public void setMask(boolean mask) {
+ this.mask = mask;
+ }
+
+ /**
+ * Push the given flow rate over output port.
+ *
+ * @param rate The rate of the flow to push.
+ */
+ public void push(float rate) {
+ demand = rate;
+ InPort input = this.input;
+
+ if (input != null) {
+ input.push(rate);
+ }
+ }
+
+ /**
+ * Signal to the downstream port that the output has completed successfully and disconnect the port from its input.
+ * <p>
+ * The output port can still be used and re-connected to another input.
+ */
+ public void complete() {
+ fail(null);
+ }
+
+ /**
+ * Signal a failure to the downstream port and disconnect the port from its input.
+ * <p>
+ * The output can still be used and re-connected to another input.
+ */
+ public void fail(Throwable cause) {
+ capacity = 0.f;
+
+ InPort input = this.input;
+ if (input != null) {
+ this.input = null;
+ input.output = null;
+ input.finish(cause);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ OutPort port = (OutPort) o;
+ return stage.equals(port.stage) && name.equals(port.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(stage.parentGraph, name);
+ }
+
+ /**
+ * This method is invoked when the outlet is connected to an inlet.
+ */
+ void connect() {
+ input.push(demand);
+ }
+
+ /**
+ * Pull from this outlet with a specified capacity.
+ *
+ * @param capacity The capacity of the inlet.
+ */
+ void pull(float capacity) {
+ // No-op when outlet is not active or the rate is unchanged
+ if (this.capacity == capacity) {
+ return;
+ }
+
+ try {
+ handler.onPull(this, capacity);
+ this.capacity = capacity;
+
+ if (!mask) {
+ stage.invalidate(clock.millis());
+ }
+ } catch (Exception e) {
+ stage.doFail(e);
+ }
+ }
+
+ /**
+ * This method is invoked by the connected {@link InPort} when downstream cancels the connection.
+ */
+ void cancel(Throwable cause) {
+ try {
+ handler.onDownstreamFinish(this, cause);
+ this.capacity = 0.f;
+
+ if (!mask) {
+ stage.invalidate(clock.millis());
+ }
+ } catch (Exception e) {
+ stage.doFail(e);
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Outlet.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Outlet.java
new file mode 100644
index 00000000..32e19a3b
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Outlet.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2;
+
+/**
+ * An out-going edge in a {@link FlowGraph}.
+ */
+public interface Outlet {
+ /**
+ * Return the {@link FlowGraph} to which the outlet is exposed.
+ */
+ FlowGraph getGraph();
+
+ /**
+ * Return the name of the outlet.
+ */
+ String getName();
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java
new file mode 100644
index 00000000..dec98955
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.mux;
+
+import org.opendc.simulator.flow2.FlowStageLogic;
+import org.opendc.simulator.flow2.Inlet;
+import org.opendc.simulator.flow2.Outlet;
+
+/**
+ * A {@link FlowStageLogic} that multiplexes multiple inputs over (possibly) multiple outputs.
+ */
+public interface FlowMultiplexer {
+ /**
+ * Return maximum number of inputs supported by the multiplexer.
+ */
+ int getMaxInputs();
+
+ /**
+ * Return maximum number of outputs supported by the multiplexer.
+ */
+ int getMaxOutputs();
+
+ /**
+ * Return the number of active inputs on this multiplexer.
+ */
+ int getInputCount();
+
+ /**
+ * Allocate a new input on this multiplexer with the specified capacity..
+ *
+ * @return The identifier of the input for this stage.
+ */
+ Inlet newInput();
+
+ /**
+ * Release the input at the specified slot.
+ *
+ * @param inlet The inlet to release.
+ */
+ void releaseInput(Inlet inlet);
+
+ /**
+ * Return the number of active outputs on this multiplexer.
+ */
+ int getOutputCount();
+
+ /**
+ * Allocate a new output on this multiplexer.
+ *
+ * @return The outlet for this stage.
+ */
+ Outlet newOutput();
+
+ /**
+ * Release the output at the specified slot.
+ *
+ * @param outlet The outlet to release.
+ */
+ void releaseOutput(Outlet outlet);
+
+ /**
+ * Return the total input capacity of the {@link FlowMultiplexer}.
+ */
+ float getCapacity();
+
+ /**
+ * Return the total input demand for the {@link FlowMultiplexer}.
+ */
+ float getDemand();
+
+ /**
+ * Return the total input rate for the {@link FlowMultiplexer}.
+ */
+ float getRate();
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexerFactory.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexerFactory.java
new file mode 100644
index 00000000..0b5b9141
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexerFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.mux;
+
+import org.opendc.simulator.flow2.FlowGraph;
+
+/**
+ * Factory interface for a {@link FlowMultiplexer} implementation.
+ */
+public interface FlowMultiplexerFactory {
+ /**
+ * Construct a new {@link FlowMultiplexer} belonging to the specified {@link FlowGraph}.
+ *
+ * @param graph The graph to which the multiplexer belongs.
+ */
+ FlowMultiplexer newMultiplexer(FlowGraph graph);
+
+ /**
+ * Return a {@link FlowMultiplexerFactory} for {@link ForwardingFlowMultiplexer} instances.
+ */
+ static FlowMultiplexerFactory forwardingMultiplexer() {
+ return ForwardingFlowMultiplexer.FACTORY;
+ }
+
+ /**
+ * Return a {@link FlowMultiplexerFactory} for {@link MaxMinFlowMultiplexer} instances.
+ */
+ static FlowMultiplexerFactory maxMinMultiplexer() {
+ return MaxMinFlowMultiplexer.FACTORY;
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java
new file mode 100644
index 00000000..abe3510b
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java
@@ -0,0 +1,280 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.mux;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import org.opendc.simulator.flow2.FlowGraph;
+import org.opendc.simulator.flow2.FlowStage;
+import org.opendc.simulator.flow2.FlowStageLogic;
+import org.opendc.simulator.flow2.InHandler;
+import org.opendc.simulator.flow2.InPort;
+import org.opendc.simulator.flow2.Inlet;
+import org.opendc.simulator.flow2.OutHandler;
+import org.opendc.simulator.flow2.OutPort;
+import org.opendc.simulator.flow2.Outlet;
+
+/**
+ * A {@link FlowMultiplexer} implementation that allocates inputs to the outputs of the multiplexer exclusively.
+ * This means that a single input is directly connected to an output and that the multiplexer can only support as many
+ * inputs as outputs.
+ */
+public final class ForwardingFlowMultiplexer implements FlowMultiplexer, FlowStageLogic {
+ /**
+ * Factory implementation for this implementation.
+ */
+ static FlowMultiplexerFactory FACTORY = ForwardingFlowMultiplexer::new;
+
+ public final IdleInHandler IDLE_IN_HANDLER = new IdleInHandler();
+ public final IdleOutHandler IDLE_OUT_HANDLER = new IdleOutHandler();
+
+ private final FlowStage stage;
+
+ private InPort[] inlets;
+ private OutPort[] outlets;
+ private final BitSet activeInputs;
+ private final BitSet activeOutputs;
+ private final BitSet availableOutputs;
+
+ private float capacity = 0.f;
+ private float demand = 0.f;
+
+ public ForwardingFlowMultiplexer(FlowGraph graph) {
+ this.stage = graph.newStage(this);
+
+ this.inlets = new InPort[4];
+ this.activeInputs = new BitSet();
+ this.outlets = new OutPort[4];
+ this.activeOutputs = new BitSet();
+ this.availableOutputs = new BitSet();
+ }
+
+ @Override
+ public float getCapacity() {
+ return capacity;
+ }
+
+ @Override
+ public float getDemand() {
+ return demand;
+ }
+
+ @Override
+ public float getRate() {
+ final BitSet activeOutputs = this.activeOutputs;
+ final OutPort[] outlets = this.outlets;
+ float rate = 0.f;
+ for (int i = activeOutputs.nextSetBit(0); i != -1; i = activeOutputs.nextSetBit(i + 1)) {
+ rate += outlets[i].getRate();
+ }
+ return rate;
+ }
+
+ @Override
+ public int getMaxInputs() {
+ return getOutputCount();
+ }
+
+ @Override
+ public int getMaxOutputs() {
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public int getInputCount() {
+ return activeInputs.length();
+ }
+
+ @Override
+ public Inlet newInput() {
+ final BitSet activeInputs = this.activeInputs;
+ int slot = activeInputs.nextClearBit(0);
+
+ InPort inPort = stage.getInlet("in" + slot);
+ inPort.setMask(true);
+
+ InPort[] inlets = this.inlets;
+ if (slot >= inlets.length) {
+ int newLength = inlets.length + (inlets.length >> 1);
+ inlets = Arrays.copyOf(inlets, newLength);
+ this.inlets = inlets;
+ }
+
+ final BitSet availableOutputs = this.availableOutputs;
+ int outSlot = availableOutputs.nextSetBit(0);
+
+ if (outSlot < 0) {
+ throw new IllegalStateException("No capacity available for a new input");
+ }
+
+ inlets[slot] = inPort;
+ activeInputs.set(slot);
+
+ OutPort outPort = outlets[outSlot];
+ availableOutputs.clear(outSlot);
+
+ inPort.setHandler(new ForwardingInHandler(outPort));
+ outPort.setHandler(new ForwardingOutHandler(inPort));
+
+ inPort.pull(outPort.getCapacity());
+
+ return inPort;
+ }
+
+ @Override
+ public void releaseInput(Inlet inlet) {
+ InPort port = (InPort) inlet;
+ int slot = port.getId();
+
+ final BitSet activeInputs = this.activeInputs;
+
+ if (!activeInputs.get(slot)) {
+ return;
+ }
+
+ port.cancel(null);
+ activeInputs.clear(slot);
+
+ ForwardingInHandler inHandler = (ForwardingInHandler) port.getHandler();
+ availableOutputs.set(inHandler.output.getId());
+
+ port.setHandler(IDLE_IN_HANDLER);
+ }
+
+ @Override
+ public int getOutputCount() {
+ return activeOutputs.length();
+ }
+
+ @Override
+ public Outlet newOutput() {
+ final BitSet activeOutputs = this.activeOutputs;
+ int slot = activeOutputs.nextClearBit(0);
+
+ OutPort port = stage.getOutlet("out" + slot);
+ OutPort[] outlets = this.outlets;
+ if (slot >= outlets.length) {
+ int newLength = outlets.length + (outlets.length >> 1);
+ outlets = Arrays.copyOf(outlets, newLength);
+ this.outlets = outlets;
+ }
+ outlets[slot] = port;
+
+ activeOutputs.set(slot);
+ availableOutputs.set(slot);
+ return port;
+ }
+
+ @Override
+ public void releaseOutput(Outlet outlet) {
+ OutPort port = (OutPort) outlet;
+ int slot = port.getId();
+ activeInputs.clear(slot);
+ availableOutputs.clear(slot);
+ port.complete();
+
+ port.setHandler(IDLE_OUT_HANDLER);
+ }
+
+ @Override
+ public long onUpdate(FlowStage ctx, long now) {
+ return Long.MAX_VALUE;
+ }
+
+ class ForwardingInHandler implements InHandler {
+ final OutPort output;
+
+ ForwardingInHandler(OutPort output) {
+ this.output = output;
+ }
+
+ @Override
+ public float getRate(InPort port) {
+ return output.getRate();
+ }
+
+ @Override
+ public void onPush(InPort port, float rate) {
+ ForwardingFlowMultiplexer.this.demand += -port.getDemand() + rate;
+
+ output.push(rate);
+ }
+
+ @Override
+ public void onUpstreamFinish(InPort port, Throwable cause) {
+ ForwardingFlowMultiplexer.this.demand -= port.getDemand();
+
+ final OutPort output = this.output;
+ output.push(0.f);
+
+ releaseInput(port);
+ }
+ }
+
+ private class ForwardingOutHandler implements OutHandler {
+ private final InPort input;
+
+ ForwardingOutHandler(InPort input) {
+ this.input = input;
+ }
+
+ @Override
+ public void onPull(OutPort port, float capacity) {
+ ForwardingFlowMultiplexer.this.capacity += -port.getCapacity() + capacity;
+
+ input.pull(capacity);
+ }
+
+ @Override
+ public void onDownstreamFinish(OutPort port, Throwable cause) {
+ ForwardingFlowMultiplexer.this.capacity -= port.getCapacity();
+
+ input.cancel(cause);
+
+ releaseOutput(port);
+ }
+ }
+
+ private static class IdleInHandler implements InHandler {
+ @Override
+ public float getRate(InPort port) {
+ return 0.f;
+ }
+
+ @Override
+ public void onPush(InPort port, float rate) {
+ port.cancel(new IllegalStateException("Inlet is not allocated"));
+ }
+
+ @Override
+ public void onUpstreamFinish(InPort port, Throwable cause) {}
+ }
+
+ private static class IdleOutHandler implements OutHandler {
+ @Override
+ public void onPull(OutPort port, float capacity) {}
+
+ @Override
+ public void onDownstreamFinish(OutPort port, Throwable cause) {}
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java
new file mode 100644
index 00000000..ac5c4f5c
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java
@@ -0,0 +1,297 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.mux;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import org.opendc.simulator.flow2.FlowGraph;
+import org.opendc.simulator.flow2.FlowStage;
+import org.opendc.simulator.flow2.FlowStageLogic;
+import org.opendc.simulator.flow2.InHandler;
+import org.opendc.simulator.flow2.InPort;
+import org.opendc.simulator.flow2.Inlet;
+import org.opendc.simulator.flow2.OutHandler;
+import org.opendc.simulator.flow2.OutPort;
+import org.opendc.simulator.flow2.Outlet;
+
+/**
+ * A {@link FlowMultiplexer} implementation that distributes the available capacity of the outputs over the inputs
+ * using max-min fair sharing.
+ * <p>
+ * The max-min fair sharing algorithm of this multiplexer ensures that each input receives a fair share of the combined
+ * output capacity, but allows individual inputs to use more capacity if there is still capacity left.
+ */
+public final class MaxMinFlowMultiplexer implements FlowMultiplexer, FlowStageLogic {
+ /**
+ * Factory implementation for this implementation.
+ */
+ static FlowMultiplexerFactory FACTORY = MaxMinFlowMultiplexer::new;
+
+ private final FlowStage stage;
+ private final BitSet activeInputs;
+ private final BitSet activeOutputs;
+
+ private float capacity = 0.f;
+ private float demand = 0.f;
+ private float rate = 0.f;
+
+ private InPort[] inlets;
+ private long[] inputs;
+ private float[] rates;
+ private OutPort[] outlets;
+
+ private final MultiplexerInHandler inHandler = new MultiplexerInHandler();
+ private final MultiplexerOutHandler outHandler = new MultiplexerOutHandler();
+
+ /**
+ * Construct a {@link MaxMinFlowMultiplexer} instance.
+ *
+ * @param graph The {@link FlowGraph} to add the multiplexer to.
+ */
+ public MaxMinFlowMultiplexer(FlowGraph graph) {
+ this.stage = graph.newStage(this);
+ this.activeInputs = new BitSet();
+ this.activeOutputs = new BitSet();
+
+ this.inlets = new InPort[4];
+ this.inputs = new long[4];
+ this.rates = new float[4];
+ this.outlets = new OutPort[4];
+ }
+
+ @Override
+ public float getCapacity() {
+ return capacity;
+ }
+
+ @Override
+ public float getDemand() {
+ return demand;
+ }
+
+ @Override
+ public float getRate() {
+ return rate;
+ }
+
+ @Override
+ public int getMaxInputs() {
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public int getMaxOutputs() {
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public long onUpdate(FlowStage ctx, long now) {
+ float capacity = this.capacity;
+ float demand = this.demand;
+ float rate = demand;
+
+ if (demand > capacity) {
+ rate = redistributeCapacity(inlets, inputs, rates, capacity);
+ }
+
+ if (this.rate != rate) {
+ // Only update the outputs if the output rate has changed
+ this.rate = rate;
+
+ changeRate(activeOutputs, outlets, capacity, rate);
+ }
+
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public int getInputCount() {
+ return activeInputs.length();
+ }
+
+ @Override
+ public Inlet newInput() {
+ final BitSet activeInputs = this.activeInputs;
+ int slot = activeInputs.nextClearBit(0);
+
+ InPort port = stage.getInlet("in" + slot);
+ port.setHandler(inHandler);
+ port.pull(this.capacity);
+
+ InPort[] inlets = this.inlets;
+ if (slot >= inlets.length) {
+ int newLength = inlets.length + (inlets.length >> 1);
+ inlets = Arrays.copyOf(inlets, newLength);
+ inputs = Arrays.copyOf(inputs, newLength);
+ rates = Arrays.copyOf(rates, newLength);
+ this.inlets = inlets;
+ }
+ inlets[slot] = port;
+
+ activeInputs.set(slot);
+ return port;
+ }
+
+ @Override
+ public void releaseInput(Inlet inlet) {
+ InPort port = (InPort) inlet;
+
+ activeInputs.clear(port.getId());
+ port.cancel(null);
+ }
+
+ @Override
+ public int getOutputCount() {
+ return activeOutputs.length();
+ }
+
+ @Override
+ public Outlet newOutput() {
+ final BitSet activeOutputs = this.activeOutputs;
+ int slot = activeOutputs.nextClearBit(0);
+
+ OutPort port = stage.getOutlet("out" + slot);
+ port.setHandler(outHandler);
+
+ OutPort[] outlets = this.outlets;
+ if (slot >= outlets.length) {
+ int newLength = outlets.length + (outlets.length >> 1);
+ outlets = Arrays.copyOf(outlets, newLength);
+ this.outlets = outlets;
+ }
+ outlets[slot] = port;
+
+ activeOutputs.set(slot);
+ return port;
+ }
+
+ @Override
+ public void releaseOutput(Outlet outlet) {
+ OutPort port = (OutPort) outlet;
+ activeInputs.clear(port.getId());
+ port.complete();
+ }
+
+ /**
+ * Helper function to redistribute the specified capacity across the inlets.
+ */
+ private static float redistributeCapacity(InPort[] inlets, long[] inputs, float[] rates, float capacity) {
+ // If the demand is higher than the capacity, we need use max-min fair sharing to distribute the
+ // constrained capacity across the inputs.
+ for (int i = 0; i < inputs.length; i++) {
+ InPort inlet = inlets[i];
+ if (inlet == null) {
+ break;
+ }
+
+ inputs[i] = ((long) Float.floatToRawIntBits(inlet.getDemand()) << 32) | (i & 0xFFFFFFFFL);
+ }
+ Arrays.sort(inputs);
+
+ float availableCapacity = capacity;
+ int inputSize = inputs.length;
+
+ // Divide the available output capacity fairly over the inputs using max-min fair sharing
+ for (int i = 0; i < inputs.length; i++) {
+ long v = inputs[i];
+ int slot = (int) v;
+ float d = Float.intBitsToFloat((int) (v >> 32));
+
+ if (d == 0.0) {
+ continue;
+ }
+
+ float availableShare = availableCapacity / (inputSize - i);
+ float r = Math.min(d, availableShare);
+
+ rates[slot] = r;
+ availableCapacity -= r;
+ }
+
+ return capacity - availableCapacity;
+ }
+
+ /**
+ * Helper method to change the rate of the outlets.
+ */
+ private static void changeRate(BitSet activeOutputs, OutPort[] outlets, float capacity, float rate) {
+ // Divide the requests over the available capacity of the input resources fairly
+ for (int i = activeOutputs.nextSetBit(0); i != -1; i = activeOutputs.nextSetBit(i + 1)) {
+ OutPort outlet = outlets[i];
+ float fraction = outlet.getCapacity() / capacity;
+ outlet.push(rate * fraction);
+ }
+ }
+
+ /**
+ * A {@link InHandler} implementation for the multiplexer inputs.
+ */
+ private class MultiplexerInHandler implements InHandler {
+ @Override
+ public float getRate(InPort port) {
+ return rates[port.getId()];
+ }
+
+ @Override
+ public void onPush(InPort port, float demand) {
+ MaxMinFlowMultiplexer.this.demand += -port.getDemand() + demand;
+ rates[port.getId()] = demand;
+ }
+
+ @Override
+ public void onUpstreamFinish(InPort port, Throwable cause) {
+ MaxMinFlowMultiplexer.this.demand -= port.getDemand();
+ releaseInput(port);
+ rates[port.getId()] = 0.f;
+ }
+ }
+
+ /**
+ * A {@link OutHandler} implementation for the multiplexer outputs.
+ */
+ private class MultiplexerOutHandler implements OutHandler {
+ @Override
+ public void onPull(OutPort port, float capacity) {
+ float newCapacity = MaxMinFlowMultiplexer.this.capacity - port.getCapacity() + capacity;
+ MaxMinFlowMultiplexer.this.capacity = newCapacity;
+ changeInletCapacity(newCapacity);
+ }
+
+ @Override
+ public void onDownstreamFinish(OutPort port, Throwable cause) {
+ float newCapacity = MaxMinFlowMultiplexer.this.capacity - port.getCapacity();
+ MaxMinFlowMultiplexer.this.capacity = newCapacity;
+ releaseOutput(port);
+ changeInletCapacity(newCapacity);
+ }
+
+ private void changeInletCapacity(float capacity) {
+ BitSet activeInputs = MaxMinFlowMultiplexer.this.activeInputs;
+ InPort[] inlets = MaxMinFlowMultiplexer.this.inlets;
+
+ for (int i = activeInputs.nextSetBit(0); i != -1; i = activeInputs.nextSetBit(i + 1)) {
+ inlets[i].pull(capacity);
+ }
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/FlowSink.java
index 450195ec..69c94708 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/FlowSink.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,9 +20,17 @@
* SOFTWARE.
*/
-package org.opendc.simulator.flow.internal
+package org.opendc.simulator.flow2.sink;
+
+import org.opendc.simulator.flow2.FlowStage;
+import org.opendc.simulator.flow2.Inlet;
/**
- * Constant for converting milliseconds into seconds.
+ * A {@link FlowStage} with a single input.
*/
-internal const val D_MS_TO_S = 1 / 1000.0
+public interface FlowSink {
+ /**
+ * Return the input of this {@link FlowSink}.
+ */
+ Inlet getInput();
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/SimpleFlowSink.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/SimpleFlowSink.java
new file mode 100644
index 00000000..fdfe5ee8
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/SimpleFlowSink.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.sink;
+
+import org.opendc.simulator.flow2.FlowGraph;
+import org.opendc.simulator.flow2.FlowStage;
+import org.opendc.simulator.flow2.FlowStageLogic;
+import org.opendc.simulator.flow2.InHandler;
+import org.opendc.simulator.flow2.InPort;
+import org.opendc.simulator.flow2.Inlet;
+
+/**
+ * A sink with a fixed capacity.
+ */
+public final class SimpleFlowSink implements FlowSink, FlowStageLogic {
+ private final FlowStage stage;
+ private final InPort input;
+ private final Handler handler;
+
+ /**
+ * Construct a new {@link SimpleFlowSink} with the specified initial capacity.
+ *
+ * @param graph The graph to add the sink to.
+ * @param initialCapacity The initial capacity of the sink.
+ */
+ public SimpleFlowSink(FlowGraph graph, float initialCapacity) {
+ this.stage = graph.newStage(this);
+ this.handler = new Handler();
+ this.input = stage.getInlet("in");
+ this.input.pull(initialCapacity);
+ this.input.setMask(true);
+ this.input.setHandler(handler);
+ }
+
+ /**
+ * Return the {@link Inlet} of this sink.
+ */
+ @Override
+ public Inlet getInput() {
+ return input;
+ }
+
+ /**
+ * Return the capacity of the sink.
+ */
+ public float getCapacity() {
+ return input.getCapacity();
+ }
+
+ /**
+ * Update the capacity of the sink.
+ *
+ * @param capacity The new capacity to update the sink to.
+ */
+ public void setCapacity(float capacity) {
+ input.pull(capacity);
+ stage.invalidate();
+ }
+
+ /**
+ * Return the flow rate of the sink.
+ */
+ public float getRate() {
+ return input.getRate();
+ }
+
+ /**
+ * Remove this node from the graph.
+ */
+ public void close() {
+ stage.close();
+ }
+
+ @Override
+ public long onUpdate(FlowStage ctx, long now) {
+ InPort input = this.input;
+ handler.rate = Math.min(input.getDemand(), input.getCapacity());
+ return Long.MAX_VALUE;
+ }
+
+ /**
+ * The {@link InHandler} implementation for the sink.
+ */
+ private static final class Handler implements InHandler {
+ float rate;
+
+ @Override
+ public float getRate(InPort port) {
+ return rate;
+ }
+
+ @Override
+ public void onPush(InPort port, float demand) {
+ float capacity = port.getCapacity();
+ rate = Math.min(demand, capacity);
+ }
+
+ @Override
+ public void onUpstreamFinish(InPort port, Throwable cause) {
+ rate = 0.f;
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/EmptyFlowSource.java
index 8ff0bc76..2dcc66e4 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/EmptyFlowSource.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,53 +20,46 @@
* SOFTWARE.
*/
-package org.opendc.simulator.flow
+package org.opendc.simulator.flow2.source;
+
+import org.opendc.simulator.flow2.FlowGraph;
+import org.opendc.simulator.flow2.FlowStage;
+import org.opendc.simulator.flow2.FlowStageLogic;
+import org.opendc.simulator.flow2.OutPort;
+import org.opendc.simulator.flow2.Outlet;
/**
- * An active connection between a [FlowSource] and [FlowConsumer].
+ * An empty {@link FlowSource}.
*/
-public interface FlowConnection : AutoCloseable {
- /**
- * The capacity of the connection.
- */
- public val capacity: Double
-
- /**
- * The flow rate over the connection.
- */
- public val rate: Double
-
- /**
- * The flow demand of the source.
- */
- public val demand: Double
+public final class EmptyFlowSource implements FlowSource, FlowStageLogic {
+ private final FlowStage stage;
+ private final OutPort output;
/**
- * A flag to control whether [FlowSource.onConverge] should be invoked for this source.
+ * Construct a new {@link EmptyFlowSource}.
*/
- public var shouldSourceConverge: Boolean
+ public EmptyFlowSource(FlowGraph graph) {
+ this.stage = graph.newStage(this);
+ this.output = stage.getOutlet("out");
+ }
/**
- * Pull the source.
+ * Return the {@link Outlet} of the source.
*/
- public fun pull()
+ @Override
+ public Outlet getOutput() {
+ return output;
+ }
/**
- * Pull the source.
- *
- * @param now The timestamp at which the connection is pulled.
+ * Remove this node from the graph.
*/
- public fun pull(now: Long)
+ public void close() {
+ stage.close();
+ }
- /**
- * Push the given flow [rate] over this connection.
- *
- * @param rate The rate of the flow to push.
- */
- public fun push(rate: Double)
-
- /**
- * Disconnect the consumer from its source.
- */
- public override fun close()
+ @Override
+ public long onUpdate(FlowStage ctx, long now) {
+ return Long.MAX_VALUE;
+ }
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/FlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/FlowSource.java
new file mode 100644
index 00000000..f9432c33
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/FlowSource.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.source;
+
+import org.opendc.simulator.flow2.FlowStage;
+import org.opendc.simulator.flow2.Outlet;
+
+/**
+ * A {@link FlowStage} with a single output.
+ */
+public interface FlowSource {
+ /**
+ * Return the output of this {@link FlowSource}.
+ */
+ Outlet getOutput();
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java
new file mode 100644
index 00000000..c09987cd
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.source;
+
+import java.util.function.Consumer;
+import org.opendc.simulator.flow2.FlowGraph;
+import org.opendc.simulator.flow2.FlowStage;
+import org.opendc.simulator.flow2.FlowStageLogic;
+import org.opendc.simulator.flow2.OutHandler;
+import org.opendc.simulator.flow2.OutPort;
+import org.opendc.simulator.flow2.Outlet;
+
+/**
+ * A {@link FlowSource} that ensures a flow is emitted for a specified amount of time at some utilization.
+ */
+public class RuntimeFlowSource implements FlowSource, FlowStageLogic {
+ private final float utilization;
+
+ private final FlowStage stage;
+ private final OutPort output;
+ private final Consumer<RuntimeFlowSource> completionHandler;
+
+ private long duration;
+ private long lastPull;
+
+ /**
+ * Construct a {@link RuntimeFlowSource} instance.
+ *
+ * @param graph The {@link FlowGraph} to which this source belongs.
+ * @param duration The duration of the source.
+ * @param utilization The utilization of the capacity of the outlet.
+ * @param completionHandler A callback invoked when the source completes.
+ */
+ public RuntimeFlowSource(
+ FlowGraph graph, long duration, float utilization, Consumer<RuntimeFlowSource> completionHandler) {
+ if (duration <= 0) {
+ throw new IllegalArgumentException("Duration must be positive and non-zero");
+ }
+
+ if (utilization <= 0.0) {
+ throw new IllegalArgumentException("Utilization must be positive and non-zero");
+ }
+
+ this.stage = graph.newStage(this);
+ this.output = stage.getOutlet("out");
+ this.output.setHandler(new OutHandler() {
+ @Override
+ public void onPull(OutPort port, float capacity) {}
+
+ @Override
+ public void onDownstreamFinish(OutPort port, Throwable cause) {
+ // Source cannot complete without re-connecting to another sink, so mark the source as completed
+ completionHandler.accept(RuntimeFlowSource.this);
+ }
+ });
+ this.duration = duration;
+ this.utilization = utilization;
+ this.completionHandler = completionHandler;
+ this.lastPull = graph.getEngine().getClock().millis();
+ }
+
+ /**
+ * Construct a new {@link RuntimeFlowSource}.
+ *
+ * @param graph The {@link FlowGraph} to which this source belongs.
+ * @param duration The duration of the source.
+ * @param utilization The utilization of the capacity of the outlet.
+ */
+ public RuntimeFlowSource(FlowGraph graph, long duration, float utilization) {
+ this(graph, duration, utilization, RuntimeFlowSource::close);
+ }
+
+ /**
+ * Return the {@link Outlet} of the source.
+ */
+ @Override
+ public Outlet getOutput() {
+ return output;
+ }
+
+ /**
+ * Remove this node from the graph.
+ */
+ public void close() {
+ stage.close();
+ }
+
+ @Override
+ public long onUpdate(FlowStage ctx, long now) {
+ long lastPull = this.lastPull;
+ this.lastPull = now;
+
+ long delta = Math.max(0, now - lastPull);
+
+ OutPort output = this.output;
+ float limit = output.getCapacity() * utilization;
+ long duration = this.duration - delta;
+
+ if (duration <= 0) {
+ completionHandler.accept(this);
+ return Long.MAX_VALUE;
+ }
+
+ this.duration = duration;
+ output.push(limit);
+ return now + duration;
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java
new file mode 100644
index 00000000..a0e9cb9d
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.source;
+
+import java.util.function.Consumer;
+import org.opendc.simulator.flow2.FlowGraph;
+import org.opendc.simulator.flow2.FlowStage;
+import org.opendc.simulator.flow2.FlowStageLogic;
+import org.opendc.simulator.flow2.OutHandler;
+import org.opendc.simulator.flow2.OutPort;
+import org.opendc.simulator.flow2.Outlet;
+
+/**
+ * A flow source that contains a fixed amount and is pushed with a given utilization.
+ */
+public final class SimpleFlowSource implements FlowSource, FlowStageLogic {
+ private final float utilization;
+ private float remainingAmount;
+ private long lastPull;
+
+ private final FlowStage stage;
+ private final OutPort output;
+ private final Consumer<SimpleFlowSource> completionHandler;
+
+ /**
+ * Construct a new {@link SimpleFlowSource}.
+ *
+ * @param graph The {@link FlowGraph} to which this source belongs.
+ * @param amount The amount to transfer via the outlet.
+ * @param utilization The utilization of the capacity of the outlet.
+ * @param completionHandler A callback invoked when the source completes.
+ */
+ public SimpleFlowSource(
+ FlowGraph graph, float amount, float utilization, Consumer<SimpleFlowSource> completionHandler) {
+ if (amount < 0.0) {
+ throw new IllegalArgumentException("Amount must be non-negative");
+ }
+
+ if (utilization <= 0.0) {
+ throw new IllegalArgumentException("Utilization must be positive and non-zero");
+ }
+
+ this.stage = graph.newStage(this);
+ this.output = stage.getOutlet("out");
+ this.output.setHandler(new OutHandler() {
+ @Override
+ public void onPull(OutPort port, float capacity) {}
+
+ @Override
+ public void onDownstreamFinish(OutPort port, Throwable cause) {
+ // Source cannot complete without re-connecting to another sink, so mark the source as completed
+ completionHandler.accept(SimpleFlowSource.this);
+ }
+ });
+ this.completionHandler = completionHandler;
+ this.utilization = utilization;
+ this.remainingAmount = amount;
+ this.lastPull = graph.getEngine().getClock().millis();
+ }
+
+ /**
+ * Construct a new {@link SimpleFlowSource}.
+ *
+ * @param graph The {@link FlowGraph} to which this source belongs.
+ * @param amount The amount to transfer via the outlet.
+ * @param utilization The utilization of the capacity of the outlet.
+ */
+ public SimpleFlowSource(FlowGraph graph, float amount, float utilization) {
+ this(graph, amount, utilization, SimpleFlowSource::close);
+ }
+
+ /**
+ * Return the {@link Outlet} of the source.
+ */
+ @Override
+ public Outlet getOutput() {
+ return output;
+ }
+
+ /**
+ * Remove this node from the graph.
+ */
+ public void close() {
+ stage.close();
+ }
+
+ @Override
+ public long onUpdate(FlowStage ctx, long now) {
+ long lastPull = this.lastPull;
+ this.lastPull = now;
+
+ long delta = Math.max(0, now - lastPull);
+
+ OutPort output = this.output;
+ float consumed = output.getRate() * delta / 1000.f;
+ float limit = output.getCapacity() * utilization;
+
+ float remainingAmount = this.remainingAmount - consumed;
+ this.remainingAmount = remainingAmount;
+
+ long duration = (long) Math.ceil(remainingAmount / limit * 1000);
+
+ if (duration <= 0) {
+ completionHandler.accept(this);
+ return Long.MAX_VALUE;
+ }
+
+ output.push(limit);
+ return now + duration;
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java
new file mode 100644
index 00000000..e8abc2d7
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.source;
+
+import java.util.function.Consumer;
+import org.opendc.simulator.flow2.FlowGraph;
+import org.opendc.simulator.flow2.FlowStage;
+import org.opendc.simulator.flow2.FlowStageLogic;
+import org.opendc.simulator.flow2.OutHandler;
+import org.opendc.simulator.flow2.OutPort;
+import org.opendc.simulator.flow2.Outlet;
+
+/**
+ * A flow source that replays a sequence of fragments, each indicating the flow rate for some period of time.
+ */
+public final class TraceFlowSource implements FlowSource, FlowStageLogic {
+ private final OutPort output;
+ private final long[] deadlines;
+ private final float[] usages;
+ private final int size;
+ private int index;
+
+ private final FlowStage stage;
+ private final Consumer<TraceFlowSource> completionHandler;
+
+ /**
+ * Construct a {@link TraceFlowSource}.
+ *
+ * @param graph The {@link FlowGraph} to which the source belongs.
+ * @param trace The {@link Trace} to replay.
+ * @param completionHandler The completion handler to invoke when the source finishes.
+ */
+ public TraceFlowSource(FlowGraph graph, Trace trace, Consumer<TraceFlowSource> completionHandler) {
+ this.stage = graph.newStage(this);
+ this.output = stage.getOutlet("out");
+ this.output.setHandler(new OutHandler() {
+ @Override
+ public void onPull(OutPort port, float capacity) {}
+
+ @Override
+ public void onDownstreamFinish(OutPort port, Throwable cause) {
+ // Source cannot complete without re-connecting to another sink, so mark the source as completed
+ completionHandler.accept(TraceFlowSource.this);
+ }
+ });
+ this.deadlines = trace.deadlines;
+ this.usages = trace.usages;
+ this.size = trace.size;
+ this.completionHandler = completionHandler;
+ }
+
+ /**
+ * Construct a {@link TraceFlowSource}.
+ *
+ * @param graph The {@link FlowGraph} to which the source belongs.
+ * @param trace The {@link Trace} to replay.
+ */
+ public TraceFlowSource(FlowGraph graph, Trace trace) {
+ this(graph, trace, TraceFlowSource::close);
+ }
+
+ @Override
+ public Outlet getOutput() {
+ return output;
+ }
+
+ /**
+ * Remove this node from the graph.
+ */
+ public void close() {
+ stage.close();
+ }
+
+ @Override
+ public long onUpdate(FlowStage ctx, long now) {
+ int size = this.size;
+ int index = this.index;
+ long[] deadlines = this.deadlines;
+ long deadline;
+
+ do {
+ deadline = deadlines[index];
+ } while (deadline <= now && ++index < size);
+
+ if (index >= size) {
+ output.push(0.0f);
+ completionHandler.accept(this);
+ return Long.MAX_VALUE;
+ }
+
+ this.index = index;
+ float usage = usages[index];
+ output.push(usage);
+
+ return deadline;
+ }
+
+ /**
+ * A trace describes the workload over time.
+ */
+ public static final class Trace {
+ private final long[] deadlines;
+ private final float[] usages;
+ private final int size;
+
+ /**
+ * Construct a {@link Trace}.
+ *
+ * @param deadlines The deadlines of the trace fragments.
+ * @param usages The usages of the trace fragments.
+ * @param size The size of the trace.
+ */
+ public Trace(long[] deadlines, float[] usages, int size) {
+ this.deadlines = deadlines;
+ this.usages = usages;
+ this.size = size;
+ }
+
+ public long[] getDeadlines() {
+ return deadlines;
+ }
+
+ public float[] getUsages() {
+ return usages;
+ }
+
+ public int getSize() {
+ return size;
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransform.java
index d8ad7978..51ea7df3 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransform.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,29 +20,22 @@
* SOFTWARE.
*/
-package org.opendc.simulator.flow
+package org.opendc.simulator.flow2.util;
+
+import org.opendc.simulator.flow2.FlowGraph;
/**
- * An interface that tracks cumulative counts of the flow accumulation over a stage.
+ * A {@link FlowTransform} describes a transformation between two components in a {@link FlowGraph} that might operate
+ * at different units of flow.
*/
-public interface FlowCounters {
- /**
- * The accumulated flow that a source wanted to push over the connection.
- */
- public val demand: Double
-
- /**
- * The accumulated flow that was actually transferred over the connection.
- */
- public val actual: Double
-
+public interface FlowTransform {
/**
- * The amount of capacity that was not utilized.
+ * Apply the transform to the specified flow rate.
*/
- public val remaining: Double
+ float apply(float value);
/**
- * Reset the flow counters.
+ * Apply the inverse of the transformation to the specified flow rate.
*/
- public fun reset()
+ float applyInverse(float value);
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransformer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransformer.java
new file mode 100644
index 00000000..852240d8
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransformer.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.util;
+
+import org.opendc.simulator.flow2.*;
+import org.opendc.simulator.flow2.sink.FlowSink;
+import org.opendc.simulator.flow2.source.FlowSource;
+
+/**
+ * Helper class to transform flow from outlet to inlet.
+ */
+public final class FlowTransformer implements FlowStageLogic, FlowSource, FlowSink {
+ private final FlowStage stage;
+ private final InPort input;
+ private final OutPort output;
+
+ /**
+ * Construct a new {@link FlowTransformer}.
+ */
+ public FlowTransformer(FlowGraph graph, FlowTransform transform) {
+ this.stage = graph.newStage(this);
+ this.input = stage.getInlet("in");
+ this.output = stage.getOutlet("out");
+
+ this.input.setHandler(new ForwardInHandler(output, transform));
+ this.input.setMask(true);
+ this.output.setHandler(new ForwardOutHandler(input, transform));
+ this.output.setMask(true);
+ }
+
+ /**
+ * Return the {@link Outlet} of the transformer.
+ */
+ @Override
+ public Outlet getOutput() {
+ return output;
+ }
+
+ /**
+ * Return the {@link Inlet} of the transformer.
+ */
+ @Override
+ public Inlet getInput() {
+ return input;
+ }
+
+ /**
+ * Close the transformer.
+ */
+ void close() {
+ stage.close();
+ }
+
+ @Override
+ public long onUpdate(FlowStage ctx, long now) {
+ return Long.MAX_VALUE;
+ }
+
+ private static class ForwardInHandler implements InHandler {
+ private final OutPort output;
+ private final FlowTransform transform;
+
+ ForwardInHandler(OutPort output, FlowTransform transform) {
+ this.output = output;
+ this.transform = transform;
+ }
+
+ @Override
+ public float getRate(InPort port) {
+ return transform.applyInverse(output.getRate());
+ }
+
+ @Override
+ public void onPush(InPort port, float demand) {
+ float rate = transform.apply(demand);
+ output.push(rate);
+ }
+
+ @Override
+ public void onUpstreamFinish(InPort port, Throwable cause) {
+ output.fail(cause);
+ }
+ }
+
+ private static class ForwardOutHandler implements OutHandler {
+ private final InPort input;
+ private final FlowTransform transform;
+
+ ForwardOutHandler(InPort input, FlowTransform transform) {
+ this.input = input;
+ this.transform = transform;
+ }
+
+ @Override
+ public void onPull(OutPort port, float capacity) {
+ input.pull(transform.applyInverse(capacity));
+ }
+
+ @Override
+ public void onDownstreamFinish(OutPort port, Throwable cause) {
+ input.cancel(cause);
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransforms.java
index c320a362..428dbfca 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransforms.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 AtLarge Research
+ * Copyright (c) 2022 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,34 +20,38 @@
* SOFTWARE.
*/
-package org.opendc.simulator.flow.internal
-
-import org.opendc.simulator.flow.FlowCounters
+package org.opendc.simulator.flow2.util;
/**
- * Mutable implementation of the [FlowCounters] interface.
+ * A collection of common {@link FlowTransform} implementations.
*/
-public class MutableFlowCounters : FlowCounters {
- override val demand: Double
- get() = _counters[0]
- override val actual: Double
- get() = _counters[1]
- override val remaining: Double
- get() = _counters[2]
- private val _counters = DoubleArray(3)
+public class FlowTransforms {
+ /**
+ * Prevent construction of this class.
+ */
+ private FlowTransforms() {}
- override fun reset() {
- _counters.fill(0.0)
+ /**
+ * Return a {@link FlowTransform} that forwards the flow rate unmodified.
+ */
+ public static FlowTransform noop() {
+ return NoopFlowTransform.INSTANCE;
}
- public fun increment(demand: Double, actual: Double, remaining: Double) {
- val counters = _counters
- counters[0] += demand
- counters[1] += actual
- counters[2] += remaining
- }
+ /**
+ * No-op implementation of a {@link FlowTransform}.
+ */
+ private static final class NoopFlowTransform implements FlowTransform {
+ static final NoopFlowTransform INSTANCE = new NoopFlowTransform();
+
+ @Override
+ public float apply(float value) {
+ return value;
+ }
- override fun toString(): String {
- return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining]"
+ @Override
+ public float applyInverse(float value) {
+ return value;
+ }
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
deleted file mode 100644
index a49826f4..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import kotlinx.coroutines.suspendCancellableCoroutine
-import kotlin.coroutines.resume
-import kotlin.coroutines.resumeWithException
-
-/**
- * A consumer of a [FlowSource].
- */
-public interface FlowConsumer {
- /**
- * A flag to indicate that the consumer is currently consuming a [FlowSource].
- */
- public val isActive: Boolean
-
- /**
- * The flow capacity of this consumer.
- */
- public val capacity: Double
-
- /**
- * The current flow rate of the consumer.
- */
- public val rate: Double
-
- /**
- * The current flow demand.
- */
- public val demand: Double
-
- /**
- * The flow counters to track the flow metrics of the consumer.
- */
- public val counters: FlowCounters
-
- /**
- * Start consuming the specified [source].
- *
- * @throws IllegalStateException if the consumer is already active.
- */
- public fun startConsumer(source: FlowSource)
-
- /**
- * Ask the consumer to pull its source.
- *
- * If the consumer is not active, this operation will be a no-op.
- */
- public fun pull()
-
- /**
- * Disconnect the consumer from its source.
- *
- * If the consumer is not active, this operation will be a no-op.
- */
- public fun cancel()
-}
-
-/**
- * Consume the specified [source] and suspend execution until the source is fully consumed or failed.
- */
-public suspend fun FlowConsumer.consume(source: FlowSource) {
- return suspendCancellableCoroutine { cont ->
- startConsumer(object : FlowSource {
- override fun onStart(conn: FlowConnection, now: Long) {
- try {
- source.onStart(conn, now)
- } catch (cause: Throwable) {
- cont.resumeWithException(cause)
- throw cause
- }
- }
-
- override fun onStop(conn: FlowConnection, now: Long) {
- try {
- source.onStop(conn, now)
-
- if (!cont.isCompleted) {
- cont.resume(Unit)
- }
- } catch (cause: Throwable) {
- cont.resumeWithException(cause)
- throw cause
- }
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return try {
- source.onPull(conn, now)
- } catch (cause: Throwable) {
- cont.resumeWithException(cause)
- throw cause
- }
- }
-
- override fun onConverge(conn: FlowConnection, now: Long) {
- try {
- source.onConverge(conn, now)
- } catch (cause: Throwable) {
- cont.resumeWithException(cause)
- throw cause
- }
- }
-
- override fun toString(): String = "SuspendingFlowSource"
- })
-
- cont.invokeOnCancellation { cancel() }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
deleted file mode 100644
index 1d3adb10..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-/**
- * A collection of callbacks associated with a [FlowConsumer].
- */
-public interface FlowConsumerLogic {
- /**
- * This method is invoked when a [FlowSource] changes the rate of flow to this consumer.
- *
- * @param ctx The context in which the provider runs.
- * @param now The virtual timestamp in milliseconds at which the update is occurring.
- * @param rate The requested processing rate of the source.
- */
- public fun onPush(ctx: FlowConsumerContext, now: Long, rate: Double) {}
-
- /**
- * This method is invoked when the flow graph has converged into a steady-state system.
- *
- * Make sure to enable [FlowConsumerContext.shouldSourceConverge] if you need this callback. By default, this method
- * will not be invoked.
- *
- * @param ctx The context in which the provider runs.
- * @param now The virtual timestamp in milliseconds at which the system converged.
- */
- public fun onConverge(ctx: FlowConsumerContext, now: Long) {}
-
- /**
- * This method is invoked when the [FlowSource] completed or failed.
- *
- * @param ctx The context in which the provider runs.
- * @param now The virtual timestamp in milliseconds at which the provider finished.
- * @param cause The cause of the failure or `null` if the source completed.
- */
- public fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) {}
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt
deleted file mode 100644
index 65224827..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import org.opendc.simulator.flow.internal.FlowEngineImpl
-import java.time.Clock
-import kotlin.coroutines.CoroutineContext
-
-/**
- * A [FlowEngine] is responsible for managing the interaction between [FlowSource]s and [FlowConsumer]s.
- *
- * The engine centralizes the scheduling logic of state updates of flow connections, allowing update propagation
- * to happen more efficiently. and overall, reducing the work necessary to transition into a steady state.
- */
-public interface FlowEngine {
- /**
- * The virtual [Clock] associated with this engine.
- */
- public val clock: Clock
-
- /**
- * Create a new [FlowConsumerContext] with the given [provider].
- *
- * @param consumer The consumer logic.
- * @param provider The logic of the resource provider.
- */
- public fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext
-
- /**
- * Start batching the execution of resource updates until [popBatch] is called.
- *
- * This method is useful if you want to propagate multiple resources updates (e.g., starting multiple CPUs
- * simultaneously) in a single state update.
- *
- * Multiple calls to this method requires the same number of [popBatch] calls in order to properly flush the
- * resource updates. This allows nested calls to [pushBatch], but might cause issues if [popBatch] is not called
- * the same amount of times. To simplify batching, see [batch].
- */
- public fun pushBatch()
-
- /**
- * Stop the batching of resource updates and run the interpreter on the batch.
- *
- * Note that method will only flush the event once the first call to [pushBatch] has received a [popBatch] call.
- */
- public fun popBatch()
-
- public companion object {
- /**
- * Construct a new [FlowEngine] implementation.
- *
- * @param context The coroutine context to use.
- * @param clock The virtual simulation clock.
- */
- @JvmStatic
- @JvmName("create")
- public operator fun invoke(context: CoroutineContext, clock: Clock): FlowEngine {
- return FlowEngineImpl(context, clock)
- }
- }
-}
-
-/**
- * Batch the execution of several interrupts into a single call.
- *
- * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update.
- */
-public inline fun FlowEngine.batch(block: () -> Unit) {
- try {
- pushBatch()
- block()
- } finally {
- popBatch()
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
deleted file mode 100644
index 5202c252..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import mu.KotlinLogging
-import org.opendc.simulator.flow.internal.D_MS_TO_S
-import org.opendc.simulator.flow.internal.MutableFlowCounters
-
-/**
- * The logging instance of this connection.
- */
-private val logger = KotlinLogging.logger {}
-
-/**
- * A class that acts as a [FlowSource] and [FlowConsumer] at the same time.
- *
- * @param engine The [FlowEngine] the forwarder runs in.
- * @param listener The convergence lister to use.
- * @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits.
- */
-public class FlowForwarder(
- private val engine: FlowEngine,
- private val listener: FlowConvergenceListener? = null,
- private val isCoupled: Boolean = false
-) : FlowSource, FlowConsumer, AutoCloseable {
- /**
- * The delegate [FlowSource].
- */
- private var delegate: FlowSource? = null
-
- /**
- * A flag to indicate that the delegate was started.
- */
- private var hasDelegateStarted: Boolean = false
-
- /**
- * The exposed [FlowConnection].
- */
- private val _ctx = object : FlowConnection {
- override var shouldSourceConverge: Boolean = false
- set(value) {
- field = value
- _innerCtx?.shouldSourceConverge = value
- }
-
- override val capacity: Double
- get() = _innerCtx?.capacity ?: 0.0
-
- override val demand: Double
- get() = _innerCtx?.demand ?: 0.0
-
- override val rate: Double
- get() = _innerCtx?.rate ?: 0.0
-
- override fun pull() {
- _innerCtx?.pull()
- }
-
- override fun pull(now: Long) {
- _innerCtx?.pull(now)
- }
-
- override fun push(rate: Double) {
- if (delegate == null) {
- return
- }
-
- _innerCtx?.push(rate)
- _demand = rate
- }
-
- override fun close() {
- val delegate = delegate ?: return
- val hasDelegateStarted = hasDelegateStarted
-
- // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
- // reset beforehand the existing state and check whether it has been updated afterwards
- reset()
-
- if (hasDelegateStarted) {
- val now = engine.clock.millis()
- delegate.onStop(this, now)
- }
- }
- }
-
- /**
- * The [FlowConnection] in which the forwarder runs.
- */
- private var _innerCtx: FlowConnection? = null
-
- override val isActive: Boolean
- get() = delegate != null
-
- override val capacity: Double
- get() = _ctx.capacity
-
- override val rate: Double
- get() = _ctx.rate
-
- override val demand: Double
- get() = _ctx.demand
-
- override val counters: FlowCounters
- get() = _counters
- private val _counters = MutableFlowCounters()
-
- override fun startConsumer(source: FlowSource) {
- check(delegate == null) { "Forwarder already active" }
-
- delegate = source
-
- // Pull to replace the source
- pull()
- }
-
- override fun pull() {
- _ctx.pull()
- }
-
- override fun cancel() {
- _ctx.close()
- }
-
- override fun close() {
- val ctx = _innerCtx
-
- if (ctx != null) {
- this._innerCtx = null
- ctx.pull()
- }
- }
-
- override fun onStart(conn: FlowConnection, now: Long) {
- _innerCtx = conn
-
- if (listener != null || _ctx.shouldSourceConverge) {
- conn.shouldSourceConverge = true
- }
- }
-
- override fun onStop(conn: FlowConnection, now: Long) {
- _innerCtx = null
-
- val delegate = delegate
- if (delegate != null) {
- reset()
-
- try {
- delegate.onStop(this._ctx, now)
- } catch (cause: Throwable) {
- logger.error(cause) { "Uncaught exception" }
- }
- }
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- val delegate = delegate
-
- if (!hasDelegateStarted) {
- start()
- }
-
- updateCounters(conn, now)
-
- return try {
- delegate?.onPull(_ctx, now) ?: Long.MAX_VALUE
- } catch (cause: Throwable) {
- logger.error(cause) { "Uncaught exception" }
-
- reset()
- Long.MAX_VALUE
- }
- }
-
- override fun onConverge(conn: FlowConnection, now: Long) {
- try {
- delegate?.onConverge(this._ctx, now)
- listener?.onConverge(now)
- } catch (cause: Throwable) {
- logger.error(cause) { "Uncaught exception" }
-
- _innerCtx = null
- reset()
- }
- }
-
- /**
- * Start the delegate.
- */
- private fun start() {
- val delegate = delegate ?: return
-
- try {
- val now = engine.clock.millis()
- delegate.onStart(_ctx, now)
- hasDelegateStarted = true
- _lastUpdate = now
- } catch (cause: Throwable) {
- logger.error(cause) { "Uncaught exception" }
- reset()
- }
- }
-
- /**
- * Reset the delegate.
- */
- private fun reset() {
- if (isCoupled) {
- _innerCtx?.close()
- } else {
- _innerCtx?.push(0.0)
- }
-
- delegate = null
- hasDelegateStarted = false
- }
-
- /**
- * The requested flow rate.
- */
- private var _demand: Double = 0.0
- private var _lastUpdate = 0L
-
- /**
- * Update the flow counters for the transformer.
- */
- private fun updateCounters(ctx: FlowConnection, now: Long) {
- val lastUpdate = _lastUpdate
- _lastUpdate = now
- val delta = now - lastUpdate
- if (delta <= 0) {
- return
- }
-
- val counters = _counters
- val deltaS = delta * D_MS_TO_S
- val total = ctx.capacity * deltaS
- val work = _demand * deltaS
- val actualWork = ctx.rate * deltaS
-
- counters.increment(work, actualWork, (total - actualWork))
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt
deleted file mode 100644
index af702701..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-/**
- * A [FlowConsumer] that maps the pushed flow through [transform].
- *
- * @param source The source of the flow.
- * @param transform The method to transform the flow.
- */
-public class FlowMapper(
- private val source: FlowSource,
- private val transform: (FlowConnection, Double) -> Double
-) : FlowSource {
-
- /**
- * The current active connection.
- */
- private var _conn: Connection? = null
-
- override fun onStart(conn: FlowConnection, now: Long) {
- check(_conn == null) { "Concurrent access" }
- val delegate = Connection(conn, transform)
- _conn = delegate
- source.onStart(delegate, now)
- }
-
- override fun onStop(conn: FlowConnection, now: Long) {
- val delegate = checkNotNull(_conn) { "Invariant violation" }
- _conn = null
- source.onStop(delegate, now)
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- val delegate = checkNotNull(_conn) { "Invariant violation" }
- return source.onPull(delegate, now)
- }
-
- override fun onConverge(conn: FlowConnection, now: Long) {
- val delegate = _conn ?: return
- source.onConverge(delegate, now)
- }
-
- /**
- * The wrapper [FlowConnection] that is used to transform the flow.
- */
- private class Connection(
- private val delegate: FlowConnection,
- private val transform: (FlowConnection, Double) -> Double
- ) : FlowConnection by delegate {
- override fun push(rate: Double) {
- delegate.push(transform(this, rate))
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
deleted file mode 100644
index ee8cd739..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-import org.opendc.simulator.flow.internal.D_MS_TO_S
-import org.opendc.simulator.flow.internal.MutableFlowCounters
-
-/**
- * A [FlowSink] represents a sink with a fixed capacity.
- *
- * @param initialCapacity The initial capacity of the resource.
- * @param engine The engine that is used for driving the flow simulation.
- * @param parent The parent flow system.
- */
-public class FlowSink(
- private val engine: FlowEngine,
- initialCapacity: Double,
- private val parent: FlowConvergenceListener? = null
-) : FlowConsumer {
- /**
- * A flag to indicate that the flow consumer is active.
- */
- public override val isActive: Boolean
- get() = _ctx != null
-
- /**
- * The capacity of the consumer.
- */
- public override var capacity: Double = initialCapacity
- set(value) {
- field = value
- _ctx?.capacity = value
- }
-
- /**
- * The current processing rate of the consumer.
- */
- public override val rate: Double
- get() = _ctx?.rate ?: 0.0
-
- /**
- * The flow processing rate demand at this instant.
- */
- public override val demand: Double
- get() = _ctx?.demand ?: 0.0
-
- /**
- * The flow counters to track the flow metrics of the consumer.
- */
- public override val counters: FlowCounters
- get() = _counters
- private val _counters = MutableFlowCounters()
-
- /**
- * The current active [FlowConsumerLogic] of this sink.
- */
- private var _ctx: FlowConsumerContext? = null
-
- override fun startConsumer(source: FlowSource) {
- check(_ctx == null) { "Consumer is in invalid state" }
-
- val ctx = engine.newContext(source, Logic(parent, _counters))
- _ctx = ctx
-
- ctx.capacity = capacity
- if (parent != null) {
- ctx.shouldConsumerConverge = true
- }
-
- ctx.start()
- }
-
- override fun pull() {
- _ctx?.pull()
- }
-
- override fun cancel() {
- _ctx?.close()
- }
-
- override fun toString(): String = "FlowSink[capacity=$capacity]"
-
- /**
- * [FlowConsumerLogic] of a sink.
- */
- private inner class Logic(private val parent: FlowConvergenceListener?, private val counters: MutableFlowCounters) : FlowConsumerLogic {
-
- override fun onPush(
- ctx: FlowConsumerContext,
- now: Long,
- rate: Double
- ) {
- updateCounters(ctx, now, rate, ctx.capacity)
- }
-
- override fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) {
- updateCounters(ctx, now, 0.0, 0.0)
-
- _ctx = null
- }
-
- override fun onConverge(ctx: FlowConsumerContext, now: Long) {
- parent?.onConverge(now)
- }
-
- /**
- * The previous demand and capacity for the consumer.
- */
- private val _previous = DoubleArray(2)
- private var _previousUpdate = Long.MAX_VALUE
-
- /**
- * Update the counters of the flow consumer.
- */
- private fun updateCounters(ctx: FlowConnection, now: Long, nextDemand: Double, nextCapacity: Double) {
- val previousUpdate = _previousUpdate
- _previousUpdate = now
- val delta = now - previousUpdate
-
- val counters = counters
- val previous = _previous
- val demand = previous[0]
- val capacity = previous[1]
-
- previous[0] = nextDemand
- previous[1] = nextCapacity
-
- if (delta <= 0) {
- return
- }
-
- val deltaS = delta * D_MS_TO_S
- val total = demand * deltaS
- val work = capacity * deltaS
- val actualWork = ctx.rate * deltaS
-
- counters.increment(work, actualWork, (total - actualWork))
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
deleted file mode 100644
index a48ac18e..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow
-
-/**
- * A source of flow that is consumed by a [FlowConsumer].
- *
- * Implementations of this interface should be considered stateful and must be assumed not to be re-usable
- * (concurrently) for multiple [FlowConsumer]s, unless explicitly said otherwise.
- */
-public interface FlowSource {
- /**
- * This method is invoked when the source is started.
- *
- * @param conn The connection between the source and consumer.
- * @param now The virtual timestamp in milliseconds at which the provider finished.
- */
- public fun onStart(conn: FlowConnection, now: Long) {}
-
- /**
- * This method is invoked when the source is finished.
- *
- * @param conn The connection between the source and consumer.
- * @param now The virtual timestamp in milliseconds at which the source finished.
- */
- public fun onStop(conn: FlowConnection, now: Long) {}
-
- /**
- * This method is invoked when the source is pulled.
- *
- * @param conn The connection between the source and consumer.
- * @param now The virtual timestamp in milliseconds at which the pull is occurring.
- * @return The duration after which the resource consumer should be pulled again.
- */
- public fun onPull(conn: FlowConnection, now: Long): Long
-
- /**
- * This method is invoked when the flow graph has converged into a steady-state system.
- *
- * Make sure to enable [FlowConnection.shouldSourceConverge] if you need this callback. By default, this method
- * will not be invoked.
- *
- * @param conn The connection between the source and consumer.
- * @param now The virtual timestamp in milliseconds at which the system converged.
- */
- public fun onConverge(conn: FlowConnection, now: Long) {}
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt
deleted file mode 100644
index 97d56fff..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.internal
-
-/**
- * States of the flow connection.
- */
-internal const val ConnPending = 0 // Connection is pending and the consumer is waiting to consume the source
-internal const val ConnActive = 1 // Connection is active and the source is currently being consumed
-internal const val ConnClosed = 2 // Connection is closed and source cannot be consumed through this connection anymore
-internal const val ConnState = 0b11 // Mask for accessing the state of the flow connection
-
-/**
- * Flags of the flow connection
- */
-internal const val ConnPulled = 1 shl 2 // The source should be pulled
-internal const val ConnPushed = 1 shl 3 // The source has pushed a value
-internal const val ConnClose = 1 shl 4 // The connection should be closed
-internal const val ConnUpdateActive = 1 shl 5 // An update for the connection is active
-internal const val ConnUpdatePending = 1 shl 6 // An (immediate) update of the connection is pending
-internal const val ConnConvergePending = 1 shl 7 // Indication that a convergence is already pending
-internal const val ConnConvergeSource = 1 shl 8 // Enable convergence of the source
-internal const val ConnConvergeConsumer = 1 shl 9 // Enable convergence of the consumer
-internal const val ConnDisableTimers = 1 shl 10 // Disable timers for the source
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
deleted file mode 100644
index fba3af5f..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
+++ /dev/null
@@ -1,436 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.internal
-
-import mu.KotlinLogging
-import org.opendc.simulator.flow.FlowConsumerContext
-import org.opendc.simulator.flow.FlowConsumerLogic
-import org.opendc.simulator.flow.FlowSource
-import org.opendc.simulator.flow.batch
-import java.util.ArrayDeque
-import kotlin.math.min
-
-/**
- * The logging instance of this connection.
- */
-private val logger = KotlinLogging.logger {}
-
-/**
- * Implementation of a [FlowConnection] managing the communication between flow sources and consumers.
- */
-internal class FlowConsumerContextImpl(
- private val engine: FlowEngineImpl,
- private val source: FlowSource,
- private val logic: FlowConsumerLogic
-) : FlowConsumerContext {
- /**
- * The capacity of the connection.
- */
- override var capacity: Double
- get() = _capacity
- set(value) {
- val oldValue = _capacity
-
- // Only changes will be propagated
- if (value != oldValue) {
- _capacity = value
- pull()
- }
- }
- private var _capacity: Double = 0.0
-
- /**
- * The current processing rate of the connection.
- */
- override val rate: Double
- get() = _rate
- private var _rate = 0.0
-
- /**
- * The current flow processing demand.
- */
- override val demand: Double
- get() = _demand
- private var _demand: Double = 0.0 // The current (pending) demand of the source
-
- /**
- * The deadline of the source.
- */
- override val deadline: Long
- get() = _deadline
- private var _deadline: Long = Long.MAX_VALUE // The deadline of the source's timer
-
- /**
- * Flags to control the convergence of the consumer and source.
- */
- override var shouldSourceConverge: Boolean
- get() = _flags and ConnConvergeSource == ConnConvergeSource
- set(value) {
- _flags =
- if (value) {
- _flags or ConnConvergeSource
- } else {
- _flags and ConnConvergeSource.inv()
- }
- }
- override var shouldConsumerConverge: Boolean
- get() = _flags and ConnConvergeConsumer == ConnConvergeConsumer
- set(value) {
- _flags =
- if (value) {
- _flags or ConnConvergeConsumer
- } else {
- _flags and ConnConvergeConsumer.inv()
- }
- }
-
- /**
- * Flag to control the timers on the [FlowSource]
- */
- override var enableTimers: Boolean
- get() = _flags and ConnDisableTimers != ConnDisableTimers
- set(value) {
- _flags =
- if (!value) {
- _flags or ConnDisableTimers
- } else {
- _flags and ConnDisableTimers.inv()
- }
- }
-
- /**
- * The clock to track simulation time.
- */
- private val _clock = engine.clock
-
- /**
- * The flags of the flow connection, indicating certain actions.
- */
- private var _flags: Int = 0
-
- /**
- * The timers at which the context is scheduled to be interrupted.
- */
- private var _timer: Long = Long.MAX_VALUE
- private val _pendingTimers: ArrayDeque<Long> = ArrayDeque(5)
-
- override fun start() {
- check(_flags and ConnState == ConnPending) { "Consumer is already started" }
- engine.batch {
- val now = _clock.millis()
- source.onStart(this, now)
-
- // Mark the connection as active and pulled
- val newFlags = (_flags and ConnState.inv()) or ConnActive or ConnPulled
- scheduleImmediate(now, newFlags)
- }
- }
-
- override fun close() {
- val flags = _flags
- if (flags and ConnState == ConnClosed) {
- return
- }
-
- // Toggle the close bit. In case no update is active, schedule a new update.
- if (flags and ConnUpdateActive == 0) {
- val now = _clock.millis()
- scheduleImmediate(now, flags or ConnClose)
- } else {
- _flags = flags or ConnClose
- }
- }
-
- override fun pull(now: Long) {
- val flags = _flags
- if (flags and ConnState != ConnActive) {
- return
- }
-
- // Mark connection as pulled
- scheduleImmediate(now, flags or ConnPulled)
- }
-
- override fun pull() {
- pull(_clock.millis())
- }
-
- override fun pullSync(now: Long) {
- val flags = _flags
-
- // Do not attempt to flush the connection if the connection is closed or an update is already active
- if (flags and ConnState != ConnActive || flags and ConnUpdateActive != 0) {
- return
- }
-
- if (flags and (ConnPulled or ConnPushed) != 0 || _deadline == now) {
- engine.scheduleSync(now, this)
- }
- }
-
- override fun push(rate: Double) {
- if (_demand == rate) {
- return
- }
-
- _demand = rate
-
- val flags = _flags
-
- if (flags and ConnUpdateActive != 0) {
- // If an update is active, it will already get picked up at the end of the update
- _flags = flags or ConnPushed
- } else {
- // Invalidate only if no update is active
- scheduleImmediate(_clock.millis(), flags or ConnPushed)
- }
- }
-
- /**
- * Update the state of the flow connection.
- *
- * @param now The current virtual timestamp.
- * @param visited The queue of connections that have been visited during the cycle.
- * @param timerQueue The queue of all pending timers.
- * @param isImmediate A flag to indicate that this invocation is an immediate update or a delayed update.
- */
- fun doUpdate(
- now: Long,
- visited: FlowDeque,
- timerQueue: FlowTimerQueue,
- isImmediate: Boolean
- ) {
- var flags = _flags
-
- // Precondition: The flow connection must be active
- if (flags and ConnState != ConnActive) {
- return
- }
-
- val deadline = _deadline
- val reachedDeadline = deadline == now
- var newDeadline: Long
- var hasUpdated = false
-
- try {
- // Pull the source if (1) `pull` is called or (2) the timer of the source has expired
- newDeadline = if (flags and ConnPulled != 0 || reachedDeadline) {
- // Update state before calling into the outside world, so it observes a consistent state
- _flags = (flags and ConnPulled.inv()) or ConnUpdateActive
- hasUpdated = true
-
- val duration = source.onPull(this, now)
-
- // IMPORTANT: Re-fetch the flags after the callback might have changed those
- flags = _flags
-
- if (duration != Long.MAX_VALUE) {
- now + duration
- } else {
- duration
- }
- } else {
- deadline
- }
-
- // Make the new deadline available for the consumer if it has changed
- if (newDeadline != deadline) {
- _deadline = newDeadline
- }
-
- // Push to the consumer if the rate of the source has changed (after a call to `push`)
- if (flags and ConnPushed != 0) {
- // Update state before calling into the outside world, so it observes a consistent state
- _flags = (flags and ConnPushed.inv()) or ConnUpdateActive
- hasUpdated = true
-
- logic.onPush(this, now, _demand)
-
- // IMPORTANT: Re-fetch the flags after the callback might have changed those
- flags = _flags
- }
-
- // Check whether the source or consumer have tried to close the connection
- if (flags and ConnClose != 0) {
- hasUpdated = true
-
- // The source has called [FlowConnection.close], so clean up the connection
- doStopSource(now)
-
- // IMPORTANT: Re-fetch the flags after the callback might have changed those
- // We now also mark the connection as closed
- flags = (_flags and ConnState.inv()) or ConnClosed
-
- _demand = 0.0
- newDeadline = Long.MAX_VALUE
- }
- } catch (cause: Throwable) {
- hasUpdated = true
-
- // Clean up the connection
- doFailSource(now, cause)
-
- // Mark the connection as closed
- flags = (flags and ConnState.inv()) or ConnClosed
-
- _demand = 0.0
- newDeadline = Long.MAX_VALUE
- }
-
- // Check whether the connection needs to be added to the visited queue. This is the case when:
- // (1) An update was performed (either a push or a pull)
- // (2) Either the source or consumer want to converge, and
- // (3) Convergence is not already pending (ConnConvergePending)
- if (hasUpdated && flags and (ConnConvergeSource or ConnConvergeConsumer) != 0 && flags and ConnConvergePending == 0) {
- visited.add(this)
- flags = flags or ConnConvergePending
- }
-
- // Compute the new flow rate of the connection
- // Note: _demand might be changed by [logic.onConsume], so we must re-fetch the value
- _rate = min(_capacity, _demand)
-
- // Indicate that no update is active anymore and flush the flags
- _flags = flags and ConnUpdateActive.inv() and ConnUpdatePending.inv()
-
- val pendingTimers = _pendingTimers
-
- // Prune the head timer if this is a delayed update
- val timer = if (!isImmediate) {
- // Invariant: Any pending timer should only point to a future timestamp
- val timer = pendingTimers.poll() ?: Long.MAX_VALUE
- _timer = timer
- timer
- } else {
- _timer
- }
-
- // Check whether we need to schedule a new timer for this connection. That is the case when:
- // (1) The deadline is valid (not the maximum value)
- // (2) The connection is active
- // (3) Timers are not disabled for the source
- // (4) The current active timer for the connection points to a later deadline
- if (newDeadline == Long.MAX_VALUE ||
- flags and ConnState != ConnActive ||
- flags and ConnDisableTimers != 0 ||
- (timer != Long.MAX_VALUE && newDeadline >= timer)
- ) {
- // Ignore any deadline scheduled at the maximum value
- // This indicates that the source does not want to register a timer
- return
- }
-
- // Construct a timer for the new deadline and add it to the global queue of timers
- _timer = newDeadline
- timerQueue.add(this, newDeadline)
-
- // Slow-path: a timer already exists for this connection, so add it to the queue of pending timers
- if (timer != Long.MAX_VALUE) {
- pendingTimers.addFirst(timer)
- }
- }
-
- /**
- * This method is invoked when the system converges into a steady state.
- */
- fun onConverge(now: Long) {
- try {
- val flags = _flags
-
- // The connection is converging now, so unset the convergence pending flag
- _flags = flags and ConnConvergePending.inv()
-
- // Call the source converge callback if it has enabled convergence
- if (flags and ConnConvergeSource != 0) {
- source.onConverge(this, now)
- }
-
- // Call the consumer callback if it has enabled convergence
- if (flags and ConnConvergeConsumer != 0) {
- logic.onConverge(this, now)
- }
- } catch (cause: Throwable) {
- // Invoke the finish callbacks
- doFailSource(now, cause)
-
- // Mark the connection as closed
- _flags = (_flags and ConnState.inv()) or ConnClosed
- _demand = 0.0
- _deadline = Long.MAX_VALUE
- }
- }
-
- override fun toString(): String = "FlowConsumerContextImpl[capacity=$capacity,rate=$_rate]"
-
- /**
- * Stop the [FlowSource].
- */
- private fun doStopSource(now: Long) {
- try {
- source.onStop(this, now)
- doFinishConsumer(now, null)
- } catch (cause: Throwable) {
- doFinishConsumer(now, cause)
- }
- }
-
- /**
- * Fail the [FlowSource].
- */
- private fun doFailSource(now: Long, cause: Throwable) {
- try {
- source.onStop(this, now)
- } catch (e: Throwable) {
- e.addSuppressed(cause)
- doFinishConsumer(now, e)
- }
- }
-
- /**
- * Finish the consumer.
- */
- private fun doFinishConsumer(now: Long, cause: Throwable?) {
- try {
- logic.onFinish(this, now, cause)
- } catch (e: Throwable) {
- e.addSuppressed(cause)
- logger.error(e) { "Uncaught exception" }
- }
- }
-
- /**
- * Schedule an immediate update for this connection.
- */
- private fun scheduleImmediate(now: Long, flags: Int) {
- // In case an immediate update is already scheduled, no need to do anything
- if (flags and ConnUpdatePending != 0) {
- _flags = flags
- return
- }
-
- // Mark the connection that there is an update pending
- _flags = flags or ConnUpdatePending
-
- engine.scheduleImmediate(now, this)
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt
deleted file mode 100644
index 403a9aec..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.internal
-
-import java.util.ArrayDeque
-
-/**
- * A specialized [ArrayDeque] that tracks the [FlowConsumerContextImpl] instances that have updated in an interpreter
- * cycle.
- *
- * By using a specialized class, we reduce the overhead caused by type-erasure.
- */
-internal class FlowDeque(initialCapacity: Int = 256) {
- /**
- * The array of elements in the queue.
- */
- private var _elements: Array<FlowConsumerContextImpl?> = arrayOfNulls(initialCapacity)
- private var _head = 0
- private var _tail = 0
-
- /**
- * Determine whether this queue is not empty.
- */
- fun isNotEmpty(): Boolean {
- return _head != _tail
- }
-
- /**
- * Add the specified [ctx] to the queue.
- */
- fun add(ctx: FlowConsumerContextImpl) {
- val es = _elements
- var tail = _tail
-
- es[tail] = ctx
-
- tail = inc(tail, es.size)
- _tail = tail
-
- if (_head == tail) {
- doubleCapacity()
- }
- }
-
- /**
- * Remove a [FlowConsumerContextImpl] from the queue or `null` if the queue is empty.
- */
- fun poll(): FlowConsumerContextImpl? {
- val es = _elements
- val head = _head
- val ctx = es[head]
-
- if (ctx != null) {
- es[head] = null
- _head = inc(head, es.size)
- }
-
- return ctx
- }
-
- /**
- * Clear the queue.
- */
- fun clear() {
- _elements.fill(null)
- _head = 0
- _tail = 0
- }
-
- private fun inc(i: Int, modulus: Int): Int {
- var x = i
- if (++x >= modulus) {
- x = 0
- }
- return x
- }
-
- /**
- * Doubles the capacity of this deque
- */
- private fun doubleCapacity() {
- assert(_head == _tail)
- val p = _head
- val n = _elements.size
- val r = n - p // number of elements to the right of p
-
- val newCapacity = n shl 1
- check(newCapacity >= 0) { "Sorry, deque too big" }
-
- val a = arrayOfNulls<FlowConsumerContextImpl>(newCapacity)
-
- _elements.copyInto(a, 0, p, n)
- _elements.copyInto(a, r, 0, p)
-
- _elements = a
- _head = 0
- _tail = n
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
deleted file mode 100644
index 6fd1ef31..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.internal
-
-import kotlinx.coroutines.Delay
-import kotlinx.coroutines.DisposableHandle
-import kotlinx.coroutines.InternalCoroutinesApi
-import kotlinx.coroutines.Runnable
-import org.opendc.simulator.flow.FlowConsumerContext
-import org.opendc.simulator.flow.FlowConsumerLogic
-import org.opendc.simulator.flow.FlowEngine
-import org.opendc.simulator.flow.FlowSource
-import java.time.Clock
-import java.util.ArrayDeque
-import kotlin.coroutines.ContinuationInterceptor
-import kotlin.coroutines.CoroutineContext
-
-/**
- * Internal implementation of the [FlowEngine] interface.
- *
- * @param context The coroutine context to use.
- * @param clock The virtual simulation clock.
- */
-internal class FlowEngineImpl(private val context: CoroutineContext, clock: Clock) : FlowEngine, Runnable {
- /**
- * The [Delay] instance that provides scheduled execution of [Runnable]s.
- */
- @OptIn(InternalCoroutinesApi::class)
- private val delay = requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" }
-
- /**
- * The queue of connection updates that are scheduled for immediate execution.
- */
- private val queue = FlowDeque()
-
- /**
- * A priority queue containing the connection updates to be scheduled in the future.
- */
- private val futureQueue = FlowTimerQueue()
-
- /**
- * The stack of engine invocations to occur in the future.
- */
- private val futureInvocations = ArrayDeque<Invocation>()
-
- /**
- * The systems that have been visited during the engine cycle.
- */
- private val visited = FlowDeque()
-
- /**
- * The index in the batch stack.
- */
- private var batchIndex = 0
-
- /**
- * The virtual [Clock] of this engine.
- */
- override val clock: Clock
- get() = _clock
- private val _clock: Clock = clock
-
- /**
- * Update the specified [ctx] synchronously.
- */
- fun scheduleSync(now: Long, ctx: FlowConsumerContextImpl) {
- ctx.doUpdate(now, visited, futureQueue, isImmediate = true)
-
- // In-case the engine is already running in the call-stack, return immediately. The changes will be picked
- // up by the active engine.
- if (batchIndex > 0) {
- return
- }
-
- doRunEngine(now)
- }
-
- /**
- * Enqueue the specified [ctx] to be updated immediately during the active engine cycle.
- *
- * This method should be used when the state of a flow context is invalidated/interrupted and needs to be
- * re-computed. In case no engine is currently active, the engine will be started.
- */
- fun scheduleImmediate(now: Long, ctx: FlowConsumerContextImpl) {
- queue.add(ctx)
-
- // In-case the engine is already running in the call-stack, return immediately. The changes will be picked
- // up by the active engine.
- if (batchIndex > 0) {
- return
- }
-
- doRunEngine(now)
- }
-
- override fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext = FlowConsumerContextImpl(this, consumer, provider)
-
- override fun pushBatch() {
- batchIndex++
- }
-
- override fun popBatch() {
- try {
- // Flush the work if the engine is not already running
- if (batchIndex == 1 && queue.isNotEmpty()) {
- doRunEngine(_clock.millis())
- }
- } finally {
- batchIndex--
- }
- }
-
- /* Runnable */
- override fun run() {
- val invocation = futureInvocations.poll() // Clear invocation from future invocation queue
- doRunEngine(invocation.timestamp)
- }
-
- /**
- * Run all the enqueued actions for the specified [timestamp][now].
- */
- private fun doRunEngine(now: Long) {
- val queue = queue
- val futureQueue = futureQueue
- val futureInvocations = futureInvocations
- val visited = visited
-
- try {
- // Increment batch index so synchronous calls will not launch concurrent engine invocations
- batchIndex++
-
- // Execute all scheduled updates at current timestamp
- while (true) {
- val ctx = futureQueue.poll(now) ?: break
- ctx.doUpdate(now, visited, futureQueue, isImmediate = false)
- }
-
- // Repeat execution of all immediate updates until the system has converged to a steady-state
- // We have to take into account that the onConverge callback can also trigger new actions.
- do {
- // Execute all immediate updates
- while (true) {
- val ctx = queue.poll() ?: break
- ctx.doUpdate(now, visited, futureQueue, isImmediate = true)
- }
-
- while (true) {
- val ctx = visited.poll() ?: break
- ctx.onConverge(now)
- }
- } while (queue.isNotEmpty())
- } finally {
- // Decrement batch index to indicate no engine is active at the moment
- batchIndex--
- }
-
- // Schedule an engine invocation for the next update to occur.
- val headDeadline = futureQueue.peekDeadline()
- if (headDeadline != Long.MAX_VALUE) {
- trySchedule(now, futureInvocations, headDeadline)
- }
- }
-
- /**
- * Try to schedule an engine invocation at the specified [target].
- *
- * @param now The current virtual timestamp.
- * @param target The virtual timestamp at which the engine invocation should happen.
- * @param scheduled The queue of scheduled invocations.
- */
- private fun trySchedule(now: Long, scheduled: ArrayDeque<Invocation>, target: Long) {
- val head = scheduled.peek()
-
- // Only schedule a new scheduler invocation in case the target is earlier than all other pending
- // scheduler invocations
- if (head == null || target < head.timestamp) {
- @OptIn(InternalCoroutinesApi::class)
- val handle = delay.invokeOnTimeout(target - now, this, context)
- scheduled.addFirst(Invocation(target, handle))
- }
- }
-
- /**
- * A future engine invocation.
- *
- * This class is used to keep track of the future engine invocations created using the [Delay] instance. In case
- * the invocation is not needed anymore, it can be cancelled via [cancel].
- */
- private class Invocation(
- @JvmField val timestamp: Long,
- @JvmField val handle: DisposableHandle
- ) {
- /**
- * Cancel the engine invocation.
- */
- fun cancel() = handle.dispose()
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt
deleted file mode 100644
index 47061a91..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.internal
-
-/**
- * Specialized priority queue for flow timers.
- *
- * By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation
- * being generic.
- */
-internal class FlowTimerQueue(initialCapacity: Int = 256) {
- /**
- * The binary heap of deadlines.
- */
- private var _deadlines = LongArray(initialCapacity) { Long.MIN_VALUE }
-
- /**
- * The binary heap of [FlowConsumerContextImpl]s.
- */
- private var _pending = arrayOfNulls<FlowConsumerContextImpl>(initialCapacity)
-
- /**
- * The number of elements in the priority queue.
- */
- private var size = 0
-
- /**
- * Register a timer for [ctx] with [deadline].
- */
- fun add(ctx: FlowConsumerContextImpl, deadline: Long) {
- val i = size
- var deadlines = _deadlines
- if (i >= deadlines.size) {
- grow()
- // Re-fetch the resized array
- deadlines = _deadlines
- }
-
- siftUp(deadlines, _pending, i, ctx, deadline)
-
- size = i + 1
- }
-
- /**
- * Update all pending [FlowConsumerContextImpl]s at the timestamp [now].
- */
- fun poll(now: Long): FlowConsumerContextImpl? {
- if (size == 0) {
- return null
- }
-
- val deadlines = _deadlines
- val deadline = deadlines[0]
-
- if (now < deadline) {
- return null
- }
-
- val pending = _pending
- val res = pending[0]
- val s = --size
-
- val nextDeadline = deadlines[s]
- val next = pending[s]!!
-
- // Clear the last element of the queue
- pending[s] = null
- deadlines[s] = Long.MIN_VALUE
-
- if (s != 0) {
- siftDown(deadlines, pending, next, nextDeadline)
- }
-
- return res
- }
-
- /**
- * Find the earliest deadline in the queue.
- */
- fun peekDeadline(): Long {
- return if (size == 0) Long.MAX_VALUE else _deadlines[0]
- }
-
- /**
- * Increases the capacity of the array.
- */
- private fun grow() {
- val oldCapacity = _deadlines.size
- // Double size if small; else grow by 50%
- val newCapacity = oldCapacity + if (oldCapacity < 64) oldCapacity + 2 else oldCapacity shr 1
-
- _deadlines = _deadlines.copyOf(newCapacity)
- _pending = _pending.copyOf(newCapacity)
- }
-
- /**
- * Insert item [ctx] at position [pos], maintaining heap invariant by promoting [ctx] up the tree until it is
- * greater than or equal to its parent, or is the root.
- *
- * @param deadlines The heap of deadlines.
- * @param pending The heap of contexts.
- * @param pos The position to fill.
- * @param ctx The [FlowConsumerContextImpl] to insert.
- * @param deadline The deadline of the context.
- */
- private fun siftUp(
- deadlines: LongArray,
- pending: Array<FlowConsumerContextImpl?>,
- pos: Int,
- ctx: FlowConsumerContextImpl,
- deadline: Long
- ) {
- var k = pos
-
- while (k > 0) {
- val parent = (k - 1) ushr 1
- val parentDeadline = deadlines[parent]
-
- if (deadline >= parentDeadline) {
- break
- }
-
- deadlines[k] = parentDeadline
- pending[k] = pending[parent]
-
- k = parent
- }
-
- deadlines[k] = deadline
- pending[k] = ctx
- }
-
- /**
- * Inserts [ctx] at the top, maintaining heap invariant by demoting [ctx] down the tree repeatedly until it
- * is less than or equal to its children or is a leaf.
- *
- * @param deadlines The heap of deadlines.
- * @param pending The heap of contexts.
- * @param ctx The [FlowConsumerContextImpl] to insert.
- * @param deadline The deadline of the context.
- */
- private fun siftDown(
- deadlines: LongArray,
- pending: Array<FlowConsumerContextImpl?>,
- ctx: FlowConsumerContextImpl,
- deadline: Long
- ) {
- var k = 0
- val size = size
- val half = size ushr 1
-
- while (k < half) {
- var child = (k shl 1) + 1
-
- var childDeadline = deadlines[child]
- val right = child + 1
-
- if (right < size) {
- val rightDeadline = deadlines[right]
-
- if (childDeadline > rightDeadline) {
- child = right
- childDeadline = rightDeadline
- }
- }
-
- if (deadline <= childDeadline) {
- break
- }
-
- deadlines[k] = childDeadline
- pending[k] = pending[child]
-
- k = child
- }
-
- deadlines[k] = deadline
- pending[k] = ctx
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
deleted file mode 100644
index 8752c559..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.mux
-
-import org.opendc.simulator.flow.FlowConsumer
-import org.opendc.simulator.flow.FlowCounters
-import org.opendc.simulator.flow.FlowSource
-
-/**
- * A [FlowMultiplexer] enables multiplexing multiple [FlowSource]s over possibly multiple [FlowConsumer]s.
- */
-public interface FlowMultiplexer {
- /**
- * The maximum number of inputs supported by the multiplexer.
- */
- public val maxInputs: Int
-
- /**
- * The maximum number of outputs supported by the multiplexer.
- */
- public val maxOutputs: Int
-
- /**
- * The inputs of the multiplexer that can be used to consume sources.
- */
- public val inputs: Set<FlowConsumer>
-
- /**
- * The outputs of the multiplexer over which the flows will be distributed.
- */
- public val outputs: Set<FlowSource>
-
- /**
- * The actual processing rate of the multiplexer.
- */
- public val rate: Double
-
- /**
- * The demanded processing rate of the input.
- */
- public val demand: Double
-
- /**
- * The capacity of the outputs.
- */
- public val capacity: Double
-
- /**
- * The flow counters to track the flow metrics of all multiplexer inputs.
- */
- public val counters: FlowCounters
-
- /**
- * Create a new input on this multiplexer with a coupled capacity.
- */
- public fun newInput(): FlowConsumer
-
- /**
- * Create a new input on this multiplexer with the specified [capacity].
- *
- * @param capacity The capacity of the input.
- */
- public fun newInput(capacity: Double): FlowConsumer
-
- /**
- * Remove [input] from this multiplexer.
- */
- public fun removeInput(input: FlowConsumer)
-
- /**
- * Create a new output on this multiplexer.
- */
- public fun newOutput(): FlowSource
-
- /**
- * Remove [output] from this multiplexer.
- */
- public fun removeOutput(output: FlowSource)
-
- /**
- * Clear all inputs and outputs from the multiplexer.
- */
- public fun clear()
-
- /**
- * Clear the inputs of the multiplexer.
- */
- public fun clearInputs()
-
- /**
- * Clear the outputs of the multiplexer.
- */
- public fun clearOutputs()
-
- /**
- * Flush the counters of the multiplexer.
- */
- public fun flushCounters()
-
- /**
- * Flush the counters of the specified [input].
- */
- public fun flushCounters(input: FlowConsumer)
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt
deleted file mode 100644
index a863e3ad..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.mux
-
-import org.opendc.simulator.flow.FlowConvergenceListener
-import org.opendc.simulator.flow.FlowEngine
-
-/**
- * Factory interface for a [FlowMultiplexer] implementation.
- */
-public fun interface FlowMultiplexerFactory {
- /**
- * Construct a new [FlowMultiplexer] using the specified [engine] and [listener].
- */
- public fun newMultiplexer(engine: FlowEngine, listener: FlowConvergenceListener?): FlowMultiplexer
-
- public companion object {
- /**
- * A [FlowMultiplexerFactory] constructing a [MaxMinFlowMultiplexer].
- */
- private val MAX_MIN_FACTORY = FlowMultiplexerFactory { engine, listener -> MaxMinFlowMultiplexer(engine, listener) }
-
- /**
- * A [FlowMultiplexerFactory] constructing a [ForwardingFlowMultiplexer].
- */
- private val FORWARDING_FACTORY = FlowMultiplexerFactory { engine, listener -> ForwardingFlowMultiplexer(engine, listener) }
-
- /**
- * Return a [FlowMultiplexerFactory] that returns [MaxMinFlowMultiplexer] instances.
- */
- @JvmStatic
- public fun maxMinMultiplexer(): FlowMultiplexerFactory = MAX_MIN_FACTORY
-
- /**
- * Return a [ForwardingFlowMultiplexer] that returns [ForwardingFlowMultiplexer] instances.
- */
- @JvmStatic
- public fun forwardingMultiplexer(): FlowMultiplexerFactory = FORWARDING_FACTORY
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
deleted file mode 100644
index 53f94a94..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.mux
-
-import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowConsumer
-import org.opendc.simulator.flow.FlowConvergenceListener
-import org.opendc.simulator.flow.FlowCounters
-import org.opendc.simulator.flow.FlowEngine
-import org.opendc.simulator.flow.FlowForwarder
-import org.opendc.simulator.flow.FlowSource
-import java.util.ArrayDeque
-
-/**
- * A [FlowMultiplexer] implementation that allocates inputs to the outputs of the multiplexer exclusively. This means
- * that a single input is directly connected to an output and that the multiplexer can only support as many
- * inputs as outputs.
- *
- * @param engine The [FlowEngine] driving the simulation.
- * @param listener The convergence listener of the multiplexer.
- */
-public class ForwardingFlowMultiplexer(
- private val engine: FlowEngine,
- private val listener: FlowConvergenceListener? = null
-) : FlowMultiplexer, FlowConvergenceListener {
-
- override val maxInputs: Int
- get() = _outputs.size
-
- override val maxOutputs: Int = Int.MAX_VALUE
-
- override val inputs: Set<FlowConsumer>
- get() = _inputs
- private val _inputs = mutableSetOf<Input>()
-
- override val outputs: Set<FlowSource>
- get() = _outputs
- private val _outputs = mutableSetOf<Output>()
- private val _availableOutputs = ArrayDeque<Output>()
-
- override val counters: FlowCounters = object : FlowCounters {
- override val demand: Double
- get() = _outputs.sumOf { it.forwarder.counters.demand }
- override val actual: Double
- get() = _outputs.sumOf { it.forwarder.counters.actual }
- override val remaining: Double
- get() = _outputs.sumOf { it.forwarder.counters.remaining }
-
- override fun reset() {
- for (output in _outputs) {
- output.forwarder.counters.reset()
- }
- }
-
- override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining]"
- }
-
- override val rate: Double
- get() = _outputs.sumOf { it.forwarder.rate }
-
- override val demand: Double
- get() = _outputs.sumOf { it.forwarder.demand }
-
- override val capacity: Double
- get() = _outputs.sumOf { it.forwarder.capacity }
-
- override fun newInput(): FlowConsumer {
- val output = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" }
- val input = Input(output)
- _inputs += input
- return input
- }
-
- override fun newInput(capacity: Double): FlowConsumer = newInput()
-
- override fun removeInput(input: FlowConsumer) {
- if (!_inputs.remove(input)) {
- return
- }
-
- val output = (input as Input).output
- output.forwarder.cancel()
- _availableOutputs += output
- }
-
- override fun newOutput(): FlowSource {
- val forwarder = FlowForwarder(engine, this)
- val output = Output(forwarder)
-
- _outputs += output
- return output
- }
-
- override fun removeOutput(output: FlowSource) {
- if (!_outputs.remove(output)) {
- return
- }
-
- val forwarder = (output as Output).forwarder
- forwarder.close()
- }
-
- override fun clearInputs() {
- for (input in _inputs) {
- val output = input.output
- output.forwarder.cancel()
- _availableOutputs += output
- }
-
- _inputs.clear()
- }
-
- override fun clearOutputs() {
- for (output in _outputs) {
- output.forwarder.cancel()
- }
- _outputs.clear()
- _availableOutputs.clear()
- }
-
- override fun clear() {
- clearOutputs()
- clearInputs()
- }
-
- override fun flushCounters() {}
-
- override fun flushCounters(input: FlowConsumer) {}
-
- override fun onConverge(now: Long) {
- listener?.onConverge(now)
- }
-
- /**
- * An input on the multiplexer.
- */
- private inner class Input(@JvmField val output: Output) : FlowConsumer by output.forwarder {
- override fun toString(): String = "ForwardingFlowMultiplexer.Input"
- }
-
- /**
- * An output on the multiplexer.
- */
- private inner class Output(@JvmField val forwarder: FlowForwarder) : FlowSource by forwarder {
- override fun onStart(conn: FlowConnection, now: Long) {
- _availableOutputs += this
- forwarder.onStart(conn, now)
- }
-
- override fun onStop(conn: FlowConnection, now: Long) {
- forwarder.cancel()
- forwarder.onStop(conn, now)
- }
-
- override fun toString(): String = "ForwardingFlowMultiplexer.Output"
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
deleted file mode 100644
index d9c6f893..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ /dev/null
@@ -1,811 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.mux
-
-import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowConsumer
-import org.opendc.simulator.flow.FlowConsumerContext
-import org.opendc.simulator.flow.FlowConsumerLogic
-import org.opendc.simulator.flow.FlowConvergenceListener
-import org.opendc.simulator.flow.FlowCounters
-import org.opendc.simulator.flow.FlowEngine
-import org.opendc.simulator.flow.FlowSource
-import org.opendc.simulator.flow.internal.D_MS_TO_S
-import org.opendc.simulator.flow.internal.MutableFlowCounters
-import kotlin.math.min
-
-/**
- * A [FlowMultiplexer] implementation that multiplexes flows over the available outputs using max-min fair sharing.
- *
- * @param engine The [FlowEngine] to drive the flow simulation.
- * @param parent The parent flow system of the multiplexer.
- */
-public class MaxMinFlowMultiplexer(
- private val engine: FlowEngine,
- parent: FlowConvergenceListener? = null
-) : FlowMultiplexer {
-
- override val maxInputs: Int = Int.MAX_VALUE
-
- override val maxOutputs: Int = Int.MAX_VALUE
-
- /**
- * The inputs of the multiplexer.
- */
- override val inputs: Set<FlowConsumer>
- get() = _inputs
- private val _inputs = mutableSetOf<Input>()
-
- /**
- * The outputs of the multiplexer.
- */
- override val outputs: Set<FlowSource>
- get() = _outputs
- private val _outputs = mutableSetOf<Output>()
-
- /**
- * The flow counters of this multiplexer.
- */
- public override val counters: FlowCounters
- get() = scheduler.counters
-
- /**
- * The actual processing rate of the multiplexer.
- */
- public override val rate: Double
- get() = scheduler.rate
-
- /**
- * The demanded processing rate of the input.
- */
- public override val demand: Double
- get() = scheduler.demand
-
- /**
- * The capacity of the outputs.
- */
- public override val capacity: Double
- get() = scheduler.capacity
-
- /**
- * The [Scheduler] instance of this multiplexer.
- */
- private val scheduler = Scheduler(engine, parent)
-
- override fun newInput(): FlowConsumer {
- return newInput(isCoupled = true, Double.POSITIVE_INFINITY)
- }
-
- override fun newInput(capacity: Double): FlowConsumer {
- return newInput(isCoupled = false, capacity)
- }
-
- private fun newInput(isCoupled: Boolean, initialCapacity: Double): FlowConsumer {
- val provider = Input(engine, scheduler, isCoupled, initialCapacity)
- _inputs.add(provider)
- return provider
- }
-
- override fun removeInput(input: FlowConsumer) {
- if (!_inputs.remove(input)) {
- return
- }
- // This cast should always succeed since only `Input` instances should be added to `_inputs`
- (input as Input).close()
- }
-
- override fun newOutput(): FlowSource {
- val output = Output(scheduler)
- _outputs.add(output)
- return output
- }
-
- override fun removeOutput(output: FlowSource) {
- if (!_outputs.remove(output)) {
- return
- }
-
- // This cast should always succeed since only `Output` instances should be added to `_outputs`
- (output as Output).cancel()
- }
-
- override fun clearInputs() {
- for (input in _inputs) {
- input.cancel()
- }
- _inputs.clear()
- }
-
- override fun clearOutputs() {
- for (output in _outputs) {
- output.cancel()
- }
- _outputs.clear()
- }
-
- override fun clear() {
- clearOutputs()
- clearInputs()
- }
-
- override fun flushCounters() {
- scheduler.updateCounters(engine.clock.millis())
- }
-
- override fun flushCounters(input: FlowConsumer) {
- (input as Input).doUpdateCounters(engine.clock.millis())
- }
-
- /**
- * Helper class containing the scheduler state.
- */
- private class Scheduler(engine: FlowEngine, private val parent: FlowConvergenceListener?) {
- /**
- * The flow counters of this scheduler.
- */
- @JvmField val counters = MutableFlowCounters()
-
- /**
- * The flow rate of the multiplexer.
- */
- @JvmField var rate = 0.0
-
- /**
- * The demand for the multiplexer.
- */
- @JvmField var demand = 0.0
-
- /**
- * The capacity of the multiplexer.
- */
- @JvmField var capacity = 0.0
-
- /**
- * An [Output] that is used to activate the scheduler.
- */
- @JvmField var activationOutput: Output? = null
-
- /**
- * The active inputs registered with the scheduler.
- */
- private val _activeInputs = mutableListOf<Input>()
-
- /**
- * An array containing the active inputs, which is used to reduce the overhead of an [ArrayList].
- */
- private var _inputArray = emptyArray<Input>()
-
- /**
- * The active outputs registered with the scheduler.
- */
- private val _activeOutputs = mutableListOf<Output>()
-
- /**
- * Flag to indicate that the scheduler is active.
- */
- private var _schedulerActive = false
-
- /**
- * The last convergence timestamp and the input.
- */
- private var _lastConverge: Long = Long.MIN_VALUE
- private var _lastConvergeInput: Input? = null
-
- /**
- * The simulation clock.
- */
- private val _clock = engine.clock
-
- /**
- * Register the specified [input] to this scheduler.
- */
- fun registerInput(input: Input) {
- _activeInputs.add(input)
- _inputArray = _activeInputs.toTypedArray()
-
- val hasActivationOutput = activationOutput != null
-
- // Disable timers and convergence of the source if one of the output manages it
- input.shouldConsumerConverge = !hasActivationOutput
- input.enableTimers = !hasActivationOutput
-
- if (input.isCoupled) {
- input.capacity = capacity
- }
-
- trigger(_clock.millis())
- }
-
- /**
- * De-register the specified [input] from this scheduler.
- */
- fun deregisterInput(input: Input, now: Long) {
- // Assign a new input responsible for handling the convergence events
- if (_lastConvergeInput == input) {
- _lastConvergeInput = null
- }
-
- _activeInputs.remove(input)
-
- // Re-run scheduler to distribute new load
- trigger(now)
- }
-
- /**
- * This method is invoked when one of the inputs converges.
- */
- fun convergeInput(input: Input, now: Long) {
- val lastConverge = _lastConverge
- val lastConvergeInput = _lastConvergeInput
- val parent = parent
-
- if (parent != null && (now > lastConverge || lastConvergeInput == null || lastConvergeInput == input)) {
- _lastConverge = now
- _lastConvergeInput = input
-
- parent.onConverge(now)
- }
- }
-
- /**
- * Register the specified [output] to this scheduler.
- */
- fun registerOutput(output: Output) {
- _activeOutputs.add(output)
-
- updateCapacity()
- updateActivationOutput()
- }
-
- /**
- * De-register the specified [output] from this scheduler.
- */
- fun deregisterOutput(output: Output, now: Long) {
- _activeOutputs.remove(output)
- updateCapacity()
-
- trigger(now)
- }
-
- /**
- * This method is invoked when one of the outputs converges.
- */
- fun convergeOutput(output: Output, now: Long) {
- val parent = parent
-
- if (parent != null) {
- _lastConverge = now
- parent.onConverge(now)
- }
-
- if (!output.isActive) {
- output.isActivationOutput = false
- updateActivationOutput()
- }
- }
-
- /**
- * Trigger the scheduler of the multiplexer.
- *
- * @param now The current virtual timestamp of the simulation.
- */
- fun trigger(now: Long) {
- if (_schedulerActive) {
- // No need to trigger the scheduler in case it is already active
- return
- }
-
- val activationOutput = activationOutput
-
- // We can run the scheduler in two ways:
- // (1) We can pull one of the multiplexer's outputs. This allows us to cascade multiple pushes by the input
- // into a single scheduling cycle, but is slower in case of a few changes at the same timestamp.
- // (2) We run the scheduler directly from this method call. This is the fastest approach when there are only
- // a few inputs and little changes at the same timestamp.
- // We always pick for option (1) unless there are no outputs available.
- if (activationOutput != null) {
- activationOutput.pull(now)
- return
- } else {
- runScheduler(now)
- }
- }
-
- /**
- * Synchronously run the scheduler of the multiplexer.
- */
- fun runScheduler(now: Long): Long {
- return try {
- _schedulerActive = true
- doRunScheduler(now)
- } finally {
- _schedulerActive = false
- }
- }
-
- /**
- * Recompute the capacity of the multiplexer.
- */
- fun updateCapacity() {
- val newCapacity = _activeOutputs.sumOf(Output::capacity)
-
- // No-op if the capacity is unchanged
- if (capacity == newCapacity) {
- return
- }
-
- capacity = newCapacity
-
- for (input in _activeInputs) {
- if (input.isCoupled) {
- input.capacity = newCapacity
- }
- }
-
- // Sort outputs by their capacity
- _activeOutputs.sort()
- }
-
- /**
- * Updates the output that is used for scheduler activation.
- */
- private fun updateActivationOutput() {
- val output = _activeOutputs.firstOrNull()
- activationOutput = output
-
- if (output != null) {
- output.isActivationOutput = true
- }
-
- val hasActivationOutput = output != null
-
- for (input in _activeInputs) {
- input.shouldConsumerConverge = !hasActivationOutput
- input.enableTimers = !hasActivationOutput
- }
- }
-
- /**
- * Schedule the inputs over the outputs.
- *
- * @return The deadline after which a new scheduling cycle should start.
- */
- private fun doRunScheduler(now: Long): Long {
- val activeInputs = _activeInputs
- val activeOutputs = _activeOutputs
- var inputArray = _inputArray
- var inputSize = _inputArray.size
-
- // Update the counters of the scheduler
- updateCounters(now)
-
- // If there is no work yet, mark the inputs as idle.
- if (inputSize == 0) {
- demand = 0.0
- rate = 0.0
- return Long.MAX_VALUE
- }
-
- val capacity = capacity
- var availableCapacity = capacity
- var deadline = Long.MAX_VALUE
- var demand = 0.0
- var shouldRebuild = false
-
- // Pull in the work of the inputs
- for (i in 0 until inputSize) {
- val input = inputArray[i]
-
- input.pullSync(now)
-
- // Remove inputs that have finished
- if (!input.isActive) {
- input.actualRate = 0.0
- shouldRebuild = true
- } else {
- demand += input.limit
- deadline = min(deadline, input.deadline)
- }
- }
-
- // Slow-path: Rebuild the input array based on the (apparently) updated `activeInputs`
- if (shouldRebuild) {
- inputArray = activeInputs.toTypedArray()
- inputSize = inputArray.size
- _inputArray = inputArray
- }
-
- val rate = if (demand > capacity) {
- // If the demand is higher than the capacity, we need use max-min fair sharing to distribute the
- // constrained capacity across the inputs.
-
- // Sort in-place the inputs based on their pushed flow.
- // Profiling shows that it is faster than maintaining some kind of sorted set.
- inputArray.sort()
-
- // Divide the available output capacity fairly over the inputs using max-min fair sharing
- for (i in 0 until inputSize) {
- val input = inputArray[i]
- val availableShare = availableCapacity / (inputSize - i)
- val grantedRate = min(input.allowedRate, availableShare)
-
- availableCapacity -= grantedRate
- input.actualRate = grantedRate
- }
-
- capacity - availableCapacity
- } else {
- demand
- }
-
- this.demand = demand
- if (this.rate != rate) {
- // Only update the outputs if the output rate has changed
- this.rate = rate
-
- // Divide the requests over the available capacity of the input resources fairly
- for (i in activeOutputs.indices) {
- val output = activeOutputs[i]
- val inputCapacity = output.capacity
- val fraction = inputCapacity / capacity
- val grantedSpeed = rate * fraction
-
- output.push(grantedSpeed)
- }
- }
-
- return deadline
- }
-
- /**
- * The previous capacity of the multiplexer.
- */
- private var _previousCapacity = 0.0
- private var _previousUpdate = Long.MIN_VALUE
-
- /**
- * Update the counters of the scheduler.
- */
- fun updateCounters(now: Long) {
- val previousCapacity = _previousCapacity
- _previousCapacity = capacity
-
- val previousUpdate = _previousUpdate
- _previousUpdate = now
-
- val delta = now - previousUpdate
- if (delta <= 0) {
- return
- }
-
- val deltaS = delta * D_MS_TO_S
- val demand = demand
- val rate = rate
-
- counters.increment(
- demand = demand * deltaS,
- actual = rate * deltaS,
- remaining = (previousCapacity - rate) * deltaS
- )
- }
- }
-
- /**
- * An internal [FlowConsumer] implementation for multiplexer inputs.
- */
- private class Input(
- private val engine: FlowEngine,
- private val scheduler: Scheduler,
- @JvmField val isCoupled: Boolean,
- initialCapacity: Double
- ) : FlowConsumer, FlowConsumerLogic, Comparable<Input> {
- /**
- * A flag to indicate that the consumer is active.
- */
- override val isActive: Boolean
- get() = _ctx != null
-
- /**
- * The demand of the consumer.
- */
- override val demand: Double
- get() = limit
-
- /**
- * The processing rate of the consumer.
- */
- override val rate: Double
- get() = actualRate
-
- /**
- * The capacity of the input.
- */
- override var capacity: Double
- get() = _capacity
- set(value) {
- allowedRate = min(limit, value)
- _capacity = value
- _ctx?.capacity = value
- }
- private var _capacity = initialCapacity
-
- /**
- * The flow counters to track the flow metrics of the consumer.
- */
- override val counters: FlowCounters
- get() = _counters
- private val _counters = MutableFlowCounters()
-
- /**
- * A flag to enable timers for the input.
- */
- var enableTimers: Boolean = true
- set(value) {
- field = value
- _ctx?.enableTimers = value
- }
-
- /**
- * A flag to control whether the input should converge.
- */
- var shouldConsumerConverge: Boolean = true
- set(value) {
- field = value
- _ctx?.shouldConsumerConverge = value
- }
-
- /**
- * The requested limit.
- */
- @JvmField var limit: Double = 0.0
-
- /**
- * The actual processing speed.
- */
- @JvmField var actualRate: Double = 0.0
-
- /**
- * The processing rate that is allowed by the model constraints.
- */
- @JvmField var allowedRate: Double = 0.0
-
- /**
- * The deadline of the input.
- */
- val deadline: Long
- get() = _ctx?.deadline ?: Long.MAX_VALUE
-
- /**
- * The [FlowConsumerContext] that is currently running.
- */
- private var _ctx: FlowConsumerContext? = null
-
- /**
- * A flag to indicate that the input is closed.
- */
- private var _isClosed: Boolean = false
-
- /**
- * Close the input.
- *
- * This method is invoked when the user removes an input from the switch.
- */
- fun close() {
- _isClosed = true
- cancel()
- }
-
- /**
- * Pull the source if necessary.
- */
- fun pullSync(now: Long) {
- _ctx?.pullSync(now)
- }
-
- /* FlowConsumer */
- override fun startConsumer(source: FlowSource) {
- check(!_isClosed) { "Cannot re-use closed input" }
- check(_ctx == null) { "Consumer is in invalid state" }
-
- val ctx = engine.newContext(source, this)
- _ctx = ctx
-
- ctx.capacity = capacity
- scheduler.registerInput(this)
-
- ctx.start()
- }
-
- override fun pull() {
- _ctx?.pull()
- }
-
- override fun cancel() {
- _ctx?.close()
- }
-
- /* FlowConsumerLogic */
- override fun onPush(
- ctx: FlowConsumerContext,
- now: Long,
- rate: Double
- ) {
- doUpdateCounters(now)
-
- val allowed = min(rate, capacity)
- limit = rate
- actualRate = allowed
- allowedRate = allowed
-
- scheduler.trigger(now)
- }
-
- override fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) {
- doUpdateCounters(now)
-
- limit = 0.0
- actualRate = 0.0
- allowedRate = 0.0
-
- scheduler.deregisterInput(this, now)
-
- _ctx = null
- }
-
- override fun onConverge(ctx: FlowConsumerContext, now: Long) {
- scheduler.convergeInput(this, now)
- }
-
- /* Comparable */
- override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate)
-
- /**
- * The timestamp that the counters where last updated.
- */
- private var _lastUpdate = Long.MIN_VALUE
-
- /**
- * Helper method to update the flow counters of the multiplexer.
- */
- fun doUpdateCounters(now: Long) {
- val lastUpdate = _lastUpdate
- _lastUpdate = now
-
- val delta = (now - lastUpdate).coerceAtLeast(0)
- if (delta <= 0L) {
- return
- }
-
- val actualRate = actualRate
-
- val deltaS = delta * D_MS_TO_S
- val demand = limit * deltaS
- val actual = actualRate * deltaS
- val remaining = (_capacity - actualRate) * deltaS
-
- _counters.increment(demand, actual, remaining)
- scheduler.counters.increment(0.0, 0.0, 0.0)
- }
- }
-
- /**
- * An internal [FlowSource] implementation for multiplexer outputs.
- */
- private class Output(private val scheduler: Scheduler) : FlowSource, Comparable<Output> {
- /**
- * The active [FlowConnection] of this source.
- */
- private var _conn: FlowConnection? = null
-
- /**
- * The capacity of this output.
- */
- @JvmField var capacity: Double = 0.0
-
- /**
- * A flag to indicate that this output is the activation output.
- */
- var isActivationOutput: Boolean
- get() = _isActivationOutput
- set(value) {
- _isActivationOutput = value
- _conn?.shouldSourceConverge = value
- }
- private var _isActivationOutput: Boolean = false
-
- /**
- * A flag to indicate that the output is active.
- */
- @JvmField var isActive = false
-
- /**
- * Push the specified rate to the consumer.
- */
- fun push(rate: Double) {
- _conn?.push(rate)
- }
-
- /**
- * Cancel this output.
- */
- fun cancel() {
- _conn?.close()
- }
-
- /**
- * Pull this output.
- */
- fun pull(now: Long) {
- _conn?.pull(now)
- }
-
- override fun onStart(conn: FlowConnection, now: Long) {
- assert(_conn == null) { "Source running concurrently" }
- _conn = conn
- capacity = conn.capacity
- isActive = true
-
- scheduler.registerOutput(this)
- }
-
- override fun onStop(conn: FlowConnection, now: Long) {
- _conn = null
- capacity = 0.0
- isActive = false
-
- scheduler.deregisterOutput(this, now)
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- val capacity = capacity
- if (capacity != conn.capacity) {
- this.capacity = capacity
- scheduler.updateCapacity()
- }
-
- return if (_isActivationOutput) {
- // If this output is the activation output, synchronously run the scheduler and return the new deadline
- val deadline = scheduler.runScheduler(now)
- if (deadline == Long.MAX_VALUE) {
- deadline
- } else {
- deadline - now
- }
- } else {
- // Output is not the activation output, so trigger activation output and do not install timer for this
- // output (by returning `Long.MAX_VALUE`)
- scheduler.trigger(now)
-
- Long.MAX_VALUE
- }
- }
-
- override fun onConverge(conn: FlowConnection, now: Long) {
- if (_isActivationOutput) {
- scheduler.convergeOutput(this, now)
- }
- }
-
- override fun compareTo(other: Output): Int = capacity.compareTo(other.capacity)
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt
deleted file mode 100644
index 6cfcc82c..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.source
-
-import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowSource
-import kotlin.math.roundToLong
-
-/**
- * A [FlowSource] that contains a fixed [amount] and is pushed with a given [utilization].
- */
-public class FixedFlowSource(private val amount: Double, private val utilization: Double) : FlowSource {
-
- init {
- require(amount >= 0.0) { "Amount must be positive" }
- require(utilization > 0.0) { "Utilization must be positive" }
- }
-
- private var remainingAmount = amount
- private var lastPull: Long = 0L
-
- override fun onStart(conn: FlowConnection, now: Long) {
- lastPull = now
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- val lastPull = lastPull
- this.lastPull = now
- val delta = (now - lastPull).coerceAtLeast(0)
-
- val consumed = conn.rate * delta / 1000.0
- val limit = conn.capacity * utilization
-
- remainingAmount -= consumed
-
- val duration = (remainingAmount / limit * 1000).roundToLong()
-
- return if (duration > 0) {
- conn.push(limit)
- duration
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
deleted file mode 100644
index 80127fb5..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.source
-
-import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowSource
-
-/**
- * Helper class to expose an observable [rate] field describing the flow rate of the source.
- */
-public class FlowSourceRateAdapter(
- private val delegate: FlowSource,
- private val callback: (Double) -> Unit = {}
-) : FlowSource by delegate {
- /**
- * The resource processing speed at this instant.
- */
- public var rate: Double = 0.0
- private set(value) {
- if (field != value) {
- callback(value)
- field = value
- }
- }
-
- init {
- callback(0.0)
- }
-
- override fun onStart(conn: FlowConnection, now: Long) {
- conn.shouldSourceConverge = true
-
- delegate.onStart(conn, now)
- }
-
- override fun onStop(conn: FlowConnection, now: Long) {
- try {
- delegate.onStop(conn, now)
- } finally {
- rate = 0.0
- }
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- return delegate.onPull(conn, now)
- }
-
- override fun onConverge(conn: FlowConnection, now: Long) {
- try {
- delegate.onConverge(conn, now)
- } finally {
- rate = conn.rate
- }
- }
-
- override fun toString(): String = "FlowSourceRateAdapter[delegate=$delegate]"
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt
deleted file mode 100644
index c9a52128..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (c) 2021 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow.source
-
-import org.opendc.simulator.flow.FlowConnection
-import org.opendc.simulator.flow.FlowSource
-
-/**
- * A [FlowSource] that replays a sequence of [Fragment], each indicating the flow rate for some period of time.
- */
-public class TraceFlowSource(private val trace: Sequence<Fragment>) : FlowSource {
- private var _iterator: Iterator<Fragment>? = null
- private var _nextTarget = Long.MIN_VALUE
-
- override fun onStart(conn: FlowConnection, now: Long) {
- check(_iterator == null) { "Source already running" }
- _iterator = trace.iterator()
- }
-
- override fun onStop(conn: FlowConnection, now: Long) {
- _iterator = null
- }
-
- override fun onPull(conn: FlowConnection, now: Long): Long {
- // Check whether the trace fragment was fully consumed, otherwise wait until we have done so
- val nextTarget = _nextTarget
- if (nextTarget > now) {
- return now - nextTarget
- }
-
- val iterator = checkNotNull(_iterator)
- return if (iterator.hasNext()) {
- val fragment = iterator.next()
- _nextTarget = now + fragment.duration
- conn.push(fragment.usage)
- fragment.duration
- } else {
- conn.close()
- Long.MAX_VALUE
- }
- }
-
- /**
- * A fragment of the trace.
- */
- public data class Fragment(val duration: Long, val usage: Double)
-}