summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt8
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java210
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowConsumer.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Inlet.java)24
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java114
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEngine.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java)88
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowGraph.java112
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNode.java191
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNodeQueue.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageQueue.java)26
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowSupplier.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/FlowSource.java)24
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowTimerQueue.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowTimerQueue.java)77
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/InvocationStack.java (renamed from opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InvocationStack.java)21
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraph.java63
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraphInternal.java93
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java312
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageLogic.java38
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandler.java54
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandlers.java53
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java214
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandler.java47
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandlers.java53
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java224
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Outlet.java38
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java95
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexerFactory.java51
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java287
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java297
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/FlowSink.java36
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/SimpleFlowSink.java123
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/EmptyFlowSource.java65
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java128
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java131
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java151
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransform.java41
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransformer.java124
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransforms.java57
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/InvocationStackTest.kt (renamed from opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/InvocationStackTest.kt)3
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt210
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowTimerQueueTest.kt385
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt72
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt55
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt131
41 files changed, 732 insertions, 3794 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt
index 6bf9c2a2..0ab051a4 100644
--- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt
@@ -93,8 +93,8 @@ class FlowBenchmarks {
val sinkA = SimpleFlowSink(graph, 3000.0f)
val sinkB = SimpleFlowSink(graph, 3000.0f)
- graph.connect(switch.newOutput(), sinkA.input)
- graph.connect(switch.newOutput(), sinkB.input)
+ graph.connect(switch.newOutPort(), sinkA.input)
+ graph.connect(switch.newOutPort(), sinkB.input)
val source = TraceFlowSource(graph, trace)
graph.connect(source.output, switch.newInput())
@@ -111,8 +111,8 @@ class FlowBenchmarks {
val sinkA = SimpleFlowSink(graph, 3000.0f)
val sinkB = SimpleFlowSink(graph, 3000.0f)
- graph.connect(switch.newOutput(), sinkA.input)
- graph.connect(switch.newOutput(), sinkB.input)
+ graph.connect(switch.newOutPort(), sinkA.input)
+ graph.connect(switch.newOutPort(), sinkB.input)
repeat(3) {
launch {
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java
new file mode 100644
index 00000000..0af2499a
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/Multiplexer.java
@@ -0,0 +1,210 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import org.opendc.simulator.engine.FlowConsumer;
+import org.opendc.simulator.engine.FlowEdge;
+import org.opendc.simulator.engine.FlowGraph;
+import org.opendc.simulator.engine.FlowNode;
+import org.opendc.simulator.engine.FlowSupplier;
+
+public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer {
+ private ArrayList<FlowEdge> consumerEdges = new ArrayList<>();
+ private FlowEdge supplierEdge;
+
+ private ArrayList<Float> demands = new ArrayList<>(); // What is demanded by the consumers
+ private ArrayList<Float> supplies = new ArrayList<>(); // What is supplied to the consumers
+
+ private float totalDemand; // The total demand of all the consumers
+ private float totalSupply; // The total supply from the supplier
+ private float capacity; // What is the max capacity
+
+ public Multiplexer(FlowGraph graph) {
+ super(graph);
+ }
+
+ public float getTotalDemand() {
+ return totalDemand;
+ }
+
+ public float getTotalSupply() {
+ return totalSupply;
+ }
+
+ public float getCapacity() {
+ return capacity;
+ }
+
+ public long onUpdate(long now) {
+
+ if (this.totalDemand > this.capacity) {
+ redistributeSupply(this.consumerEdges, this.supplies, this.capacity);
+ } else {
+ for (int i = 0; i < this.demands.size(); i++) {
+ this.supplies.set(i, this.demands.get(i));
+ }
+ }
+
+ float totalSupply = 0;
+ for (int i = 0; i < this.consumerEdges.size(); i++) {
+ this.pushSupply(this.consumerEdges.get(i), this.supplies.get(i));
+ totalSupply += this.supplies.get(i);
+ }
+
+ // Only update supplier if supply has changed
+ if (this.totalSupply != totalSupply) {
+ this.totalSupply = totalSupply;
+
+ pushDemand(this.supplierEdge, this.totalSupply);
+ }
+
+ return Long.MAX_VALUE;
+ }
+
+ private static float redistributeSupply(
+ ArrayList<FlowEdge> consumerEdges, ArrayList<Float> supplies, float capacity) {
+ final long[] consumers = new long[consumerEdges.size()];
+
+ for (int i = 0; i < consumers.length; i++) {
+ FlowEdge consumer = consumerEdges.get(i);
+
+ if (consumer == null) {
+ break;
+ }
+
+ consumers[i] = ((long) Float.floatToRawIntBits(consumer.getDemand()) << 32) | (i & 0xFFFFFFFFL);
+ }
+ Arrays.sort(consumers);
+
+ float availableCapacity = capacity;
+ int inputSize = consumers.length;
+
+ for (int i = 0; i < inputSize; i++) {
+ long v = consumers[i];
+ int slot = (int) v;
+ float d = Float.intBitsToFloat((int) (v >> 32));
+
+ if (d == 0.0) {
+ continue;
+ }
+
+ float availableShare = availableCapacity / (inputSize - i);
+ float r = Math.min(d, availableShare);
+
+ supplies.set(slot, r); // Update the rates
+ availableCapacity -= r;
+ }
+
+ // Return the used capacity
+ return capacity - availableCapacity;
+ }
+
+ /**
+ * Add a new consumer.
+ * Set its demand and supply to 0.0
+ */
+ @Override
+ public void addConsumerEdge(FlowEdge consumerEdge) {
+ this.consumerEdges.add(consumerEdge);
+ this.demands.add(0f);
+ this.supplies.add(0f);
+ }
+
+ @Override
+ public void addSupplierEdge(FlowEdge supplierEdge) {
+ this.supplierEdge = supplierEdge;
+ this.capacity = supplierEdge.getCapacity();
+ this.totalSupply = 0;
+ }
+
+ @Override
+ public void removeConsumerEdge(FlowEdge consumerEdge) {
+ int idx = this.consumerEdges.indexOf(consumerEdge);
+
+ if (idx == -1) {
+ return;
+ }
+
+ this.totalDemand -= consumerEdge.getDemand();
+
+ this.consumerEdges.remove(idx);
+ this.demands.remove(idx);
+ this.supplies.remove(idx);
+
+ this.invalidate();
+ }
+
+ @Override
+ public void removeSupplierEdge(FlowEdge supplierEdge) {
+ this.supplierEdge = null;
+ this.capacity = 0;
+ this.totalSupply = 0;
+ }
+
+ @Override
+ public void handleDemand(FlowEdge consumerEdge, float newDemand) {
+ int idx = consumerEdges.indexOf(consumerEdge);
+
+ if (idx == -1) {
+ System.out.println("Error (Multiplexer): Demand pushed by an unknown consumer");
+ return;
+ }
+
+ float prevDemand = demands.get(idx);
+ demands.set(idx, newDemand);
+
+ this.totalDemand += (newDemand - prevDemand);
+ }
+
+ @Override
+ public void handleSupply(FlowEdge supplierEdge, float newSupply) {
+ if (newSupply == this.totalSupply) {
+ return;
+ }
+
+ this.totalSupply = newSupply;
+ }
+
+ @Override
+ public void pushDemand(FlowEdge supplierEdge, float newDemand) {
+ this.supplierEdge.pushDemand(newDemand);
+ }
+
+ @Override
+ public void pushSupply(FlowEdge consumerEdge, float newSupply) {
+ int idx = consumerEdges.indexOf(consumerEdge);
+
+ if (idx == -1) {
+ System.out.println("Error (Multiplexer): pushing supply to an unknown consumer");
+ }
+
+ if (newSupply == supplies.get(idx)) {
+ return;
+ }
+
+ supplies.set(idx, newSupply);
+ consumerEdge.pushSupply(newSupply);
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Inlet.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowConsumer.java
index 4a9ea6a5..7ba5dea7 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Inlet.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowConsumer.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2022 AtLarge Research
+ * Copyright (c) 2024 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,19 +20,15 @@
* SOFTWARE.
*/
-package org.opendc.simulator.flow2;
+package org.opendc.simulator.engine;
-/**
- * An in-going edge in a {@link FlowGraph}.
- */
-public interface Inlet {
- /**
- * Return the {@link FlowGraph} to which the inlet is exposed.
- */
- FlowGraph getGraph();
+public interface FlowConsumer {
+
+ void handleSupply(FlowEdge supplierEdge, float newSupply);
+
+ void pushDemand(FlowEdge supplierEdge, float newDemand);
+
+ void addSupplierEdge(FlowEdge supplierEdge);
- /**
- * Return the name of the inlet.
- */
- String getName();
+ void removeSupplierEdge(FlowEdge supplierEdge);
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java
new file mode 100644
index 00000000..0edc9e68
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEdge.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.engine;
+
+/**
+ * An edge that connects two FlowStages.
+ * A connection between FlowStages always consist of a FlowStage that demands
+ * something, and a FlowStage that Delivers something
+ * For instance, this could be the connection between a workload, and its machine
+ */
+public class FlowEdge {
+ private FlowConsumer consumer;
+ private FlowSupplier supplier;
+
+ private float demand = 0.0f;
+ private float supply = 0.0f;
+
+ private float capacity;
+
+ public FlowEdge(FlowConsumer consumer, FlowSupplier supplier) {
+ if (!(consumer instanceof FlowNode)) {
+ throw new IllegalArgumentException("Flow consumer is not a FlowNode");
+ }
+ if (!(supplier instanceof FlowNode)) {
+ throw new IllegalArgumentException("Flow consumer is not a FlowNode");
+ }
+
+ this.consumer = consumer;
+ this.supplier = supplier;
+
+ this.capacity = supplier.getCapacity();
+
+ this.consumer.addSupplierEdge(this);
+ this.supplier.addConsumerEdge(this);
+ }
+
+ public void close() {
+ if (this.consumer != null) {
+ this.consumer.removeSupplierEdge(this);
+ this.consumer = null;
+ }
+
+ if (this.supplier != null) {
+ this.supplier.removeConsumerEdge(this);
+ this.supplier = null;
+ }
+ }
+
+ public FlowConsumer getConsumer() {
+ return consumer;
+ }
+
+ public FlowSupplier getSupplier() {
+ return supplier;
+ }
+
+ public float getCapacity() {
+ return capacity;
+ }
+
+ public float getDemand() {
+ return this.demand;
+ }
+
+ public float getSupply() {
+ return this.supply;
+ }
+
+ /**
+ * Push new demand from the Consumer to the Supplier
+ */
+ public void pushDemand(float newDemand) {
+ if (newDemand == this.demand) {
+ return;
+ }
+
+ this.demand = newDemand;
+ this.supplier.handleDemand(this, newDemand);
+ ((FlowNode) this.supplier).invalidate();
+ }
+
+ /**
+ * Push new supply from the Supplier to the Consumer
+ */
+ public void pushSupply(float newSupply) {
+ if (newSupply == this.supply) {
+ return;
+ }
+
+ this.supply = newSupply;
+ this.consumer.handleSupply(this, newSupply);
+ ((FlowNode) this.consumer).invalidate();
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEngine.java
index c0f52505..10af7c51 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowEngine.java
@@ -20,12 +20,10 @@
* SOFTWARE.
*/
-package org.opendc.simulator.flow2;
+package org.opendc.simulator.engine;
import java.time.Clock;
import java.time.InstantSource;
-import java.util.ArrayList;
-import java.util.List;
import kotlin.coroutines.CoroutineContext;
import org.opendc.common.Dispatcher;
@@ -37,12 +35,12 @@ import org.opendc.common.Dispatcher;
*/
public final class FlowEngine implements Runnable {
/**
- * The queue of {@link FlowStage} updates that are scheduled for immediate execution.
+ * The queue of {@link FlowNode} updates that are scheduled for immediate execution.
*/
- private final FlowStageQueue queue = new FlowStageQueue(256);
+ private final FlowNodeQueue queue = new FlowNodeQueue(256);
/**
- * A priority queue containing the {@link FlowStage} updates to be scheduled in the future.
+ * A priority queue containing the {@link FlowNode} updates to be scheduled in the future.
*/
private final FlowTimerQueue timerQueue = new FlowTimerQueue(256);
@@ -82,16 +80,16 @@ public final class FlowEngine implements Runnable {
* Return a new {@link FlowGraph} that can be used to build a flow network.
*/
public FlowGraph newGraph() {
- return new RootGraph(this);
+ return new FlowGraph(this);
}
/**
- * Enqueue the specified {@link FlowStage} to be updated immediately during the active engine cycle.
+ * Enqueue the specified {@link FlowNode} to be updated immediately during the active engine cycle.
* <p>
* This method should be used when the state of a flow context is invalidated/interrupted and needs to be
* re-computed.
*/
- void scheduleImmediate(long now, FlowStage ctx) {
+ void scheduleImmediate(long now, FlowNode ctx) {
scheduleImmediateInContext(ctx);
// In-case the engine is already running in the call-stack, return immediately. The changes will be picked
@@ -104,21 +102,21 @@ public final class FlowEngine implements Runnable {
}
/**
- * Enqueue the specified {@link FlowStage} to be updated immediately during the active engine cycle.
+ * Enqueue the specified {@link FlowNode} to be updated immediately during the active engine cycle.
* <p>
* This method should be used when the state of a flow context is invalidated/interrupted and needs to be
* re-computed.
* <p>
* This method should only be invoked while inside an engine cycle.
*/
- void scheduleImmediateInContext(FlowStage ctx) {
+ void scheduleImmediateInContext(FlowNode ctx) {
queue.add(ctx);
}
/**
- * Enqueue the specified {@link FlowStage} to be updated at its updated deadline.
+ * Enqueue the specified {@link FlowNode} to be updated at its updated deadline.
*/
- void scheduleDelayed(FlowStage ctx) {
+ void scheduleDelayed(FlowNode ctx) {
scheduleDelayedInContext(ctx);
// In-case the engine is already running in the call-stack, return immediately. The changes will be picked
@@ -134,11 +132,11 @@ public final class FlowEngine implements Runnable {
}
/**
- * Enqueue the specified {@link FlowStage} to be updated at its updated deadline.
+ * Enqueue the specified {@link FlowNode} to be updated at its updated deadline.
* <p>
* This method should only be invoked while inside an engine cycle.
*/
- void scheduleDelayedInContext(FlowStage ctx) {
+ void scheduleDelayedInContext(FlowNode ctx) {
FlowTimerQueue timerQueue = this.timerQueue;
timerQueue.enqueue(ctx);
}
@@ -147,7 +145,7 @@ public final class FlowEngine implements Runnable {
* Run all the enqueued actions for the specified timestamp (<code>now</code>).
*/
private void doRunEngine(long now) {
- final FlowStageQueue queue = this.queue;
+ final FlowNodeQueue queue = this.queue;
final FlowTimerQueue timerQueue = this.timerQueue;
try {
@@ -156,22 +154,22 @@ public final class FlowEngine implements Runnable {
// Execute all scheduled updates at current timestamp
while (true) {
- final FlowStage ctx = timerQueue.poll(now);
+ final FlowNode ctx = timerQueue.poll(now);
if (ctx == null) {
break;
}
- ctx.onUpdate(now);
+ ctx.update(now);
}
// Execute all immediate updates
while (true) {
- final FlowStage ctx = queue.poll();
+ final FlowNode ctx = queue.poll();
if (ctx == null) {
break;
}
- ctx.onUpdate(now);
+ ctx.update(now);
}
} finally {
active = false;
@@ -203,54 +201,4 @@ public final class FlowEngine implements Runnable {
dispatcher.schedule(target - now, this);
}
}
-
- /**
- * Internal implementation of a root {@link FlowGraph}.
- */
- private static final class RootGraph implements FlowGraphInternal {
- private final FlowEngine engine;
- private final List<FlowStage> stages = new ArrayList<>();
-
- public RootGraph(FlowEngine engine) {
- this.engine = engine;
- }
-
- @Override
- public FlowEngine getEngine() {
- return engine;
- }
-
- @Override
- public FlowStage newStage(FlowStageLogic logic) {
- final FlowEngine engine = this.engine;
- final FlowStage stage = new FlowStage(this, logic);
- stages.add(stage);
- long now = engine.getClock().millis();
- stage.invalidate(now);
- return stage;
- }
-
- @Override
- public void connect(Outlet outlet, Inlet inlet) {
- FlowGraphInternal.connect(this, outlet, inlet);
- }
-
- @Override
- public void disconnect(Outlet outlet) {
- FlowGraphInternal.disconnect(this, outlet);
- }
-
- @Override
- public void disconnect(Inlet inlet) {
- FlowGraphInternal.disconnect(this, inlet);
- }
-
- /**
- * Internal method to remove the specified {@link FlowStage} from the graph.
- */
- @Override
- public void detach(FlowStage stage) {
- stages.remove(stage);
- }
- }
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowGraph.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowGraph.java
new file mode 100644
index 00000000..d82b542b
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowGraph.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright (c) 2024 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.engine;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+public class FlowGraph {
+ private final FlowEngine engine;
+ private final ArrayList<FlowNode> nodes = new ArrayList<>();
+ private final ArrayList<FlowEdge> edges = new ArrayList<>();
+ private final HashMap<FlowNode, ArrayList<FlowEdge>> nodeToEdge = new HashMap<>();
+
+ public FlowGraph(FlowEngine engine) {
+ this.engine = engine;
+ }
+
+ /**
+ * Return the {@link FlowEngine} driving the simulation of the graph.
+ */
+ public FlowEngine getEngine() {
+ return engine;
+ }
+
+ /**
+ * Create a new {@link FlowNode} representing a node in the flow network.
+ */
+ public void addNode(FlowNode node) {
+ if (nodes.contains(node)) {
+ System.out.println("Node already exists");
+ }
+ nodes.add(node);
+ nodeToEdge.put(node, new ArrayList<>());
+ long now = this.engine.getClock().millis();
+ node.invalidate(now);
+ }
+
+ /**
+ * Internal method to remove the specified {@link FlowNode} from the graph.
+ */
+ public void removeNode(FlowNode node) {
+
+ // Remove all edges connected to node
+ final ArrayList<FlowEdge> connectedEdges = nodeToEdge.get(node);
+ while (connectedEdges.size() > 0) {
+ removeEdge(connectedEdges.get(0));
+ }
+
+ nodeToEdge.remove(node);
+
+ // remove the node
+ nodes.remove(node);
+ }
+
+ /**
+ * Add an edge between the specified consumer and supplier in this graph.
+ */
+ public void addEdge(FlowConsumer flowConsumer, FlowSupplier flowSupplier) {
+ // Check if the consumer and supplier are both FlowNodes
+ if (!(flowConsumer instanceof FlowNode)) {
+ throw new IllegalArgumentException("Flow consumer is not a FlowNode");
+ }
+ if (!(flowSupplier instanceof FlowNode)) {
+ throw new IllegalArgumentException("Flow consumer is not a FlowNode");
+ }
+
+ // Check of the consumer and supplier are present in this graph
+ if (!(this.nodes.contains((FlowNode) flowConsumer))) {
+ throw new IllegalArgumentException("The consumer is not a node in this graph");
+ }
+ if (!(this.nodes.contains((FlowNode) flowSupplier))) {
+ throw new IllegalArgumentException("The consumer is not a node in this graph");
+ }
+
+ final FlowEdge flowEdge = new FlowEdge(flowConsumer, flowSupplier);
+
+ edges.add(flowEdge);
+
+ nodeToEdge.get((FlowNode) flowConsumer).add(flowEdge);
+ nodeToEdge.get((FlowNode) flowSupplier).add(flowEdge);
+ }
+
+ public void removeEdge(FlowEdge flowEdge) {
+ final FlowConsumer consumer = flowEdge.getConsumer();
+ final FlowSupplier supplier = flowEdge.getSupplier();
+ nodeToEdge.get((FlowNode) consumer).remove(flowEdge);
+ nodeToEdge.get((FlowNode) supplier).remove(flowEdge);
+
+ edges.remove(flowEdge);
+ flowEdge.close();
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNode.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNode.java
new file mode 100644
index 00000000..d1faf465
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNode.java
@@ -0,0 +1,191 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.engine;
+
+import java.time.InstantSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link FlowNode} represents a node in a {@link FlowGraph}.
+ */
+public abstract class FlowNode {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlowNode.class);
+
+ protected enum NodeState {
+ PENDING, // Stage is active, but is not running any updates
+ UPDATING, // Stage is active, and running an update
+ INVALIDATED, // Stage is deemed invalid, and should run an update
+ CLOSING, // Stage is being closed, final updates can still be run
+ CLOSED // Stage is closed and should not run any updates
+ }
+
+ protected NodeState nodeState = NodeState.PENDING;
+
+ /**
+ * The deadline of the stage after which an update should run.
+ */
+ long deadline = Long.MAX_VALUE;
+
+ /**
+ * The index of the timer in the {@link FlowTimerQueue}.
+ */
+ int timerIndex = -1;
+
+ protected InstantSource clock;
+ protected FlowGraph parentGraph;
+ protected FlowEngine engine;
+
+ /**
+ * Construct a new {@link FlowNode} instance.
+ *
+ * @param parentGraph The {@link FlowGraph} this stage belongs to.
+ */
+ public FlowNode(FlowGraph parentGraph) {
+ this.parentGraph = parentGraph;
+ this.engine = parentGraph.getEngine();
+ this.clock = engine.getClock();
+
+ this.parentGraph.addNode(this);
+ }
+
+ /**
+ * Return the {@link FlowGraph} to which this stage belongs.
+ */
+ public FlowGraph getGraph() {
+ return parentGraph;
+ }
+
+ /**
+ * Return the current deadline of the {@link FlowNode}'s timer (in milliseconds after epoch).
+ */
+ public long getDeadline() {
+ return deadline;
+ }
+
+ public void setDeadline(long deadline) {
+ this.deadline = deadline;
+ }
+
+ public void setTimerIndex(int index) {
+ this.timerIndex = index;
+ }
+ /**
+ * Invalidate the {@link FlowNode} forcing the stage to update.
+ *
+ * <p>
+ * This method is similar to {@link #invalidate()}, but allows the user to manually pass the current timestamp to
+ * prevent having to re-query the clock. This method should not be called during an update.
+ */
+ public void invalidate(long now) {
+ // If there is already an update running,
+ // notify the update, that a next update should be run after
+ if (this.nodeState == NodeState.UPDATING) {
+ this.nodeState = NodeState.INVALIDATED;
+ } else {
+ engine.scheduleImmediate(now, this);
+ }
+ }
+
+ /**
+ * Invalidate the {@link FlowNode} forcing the stage to update.
+ */
+ public void invalidate() {
+ invalidate(clock.millis());
+ }
+
+ /**
+ * Update the state of the stage.
+ */
+ public void update(long now) {
+ this.nodeState = NodeState.UPDATING;
+
+ long newDeadline = this.deadline;
+
+ try {
+ newDeadline = this.onUpdate(now);
+ } catch (Exception e) {
+ doFail(e);
+ }
+
+ // Check whether the stage is marked as closing.
+ if (this.nodeState == NodeState.INVALIDATED) {
+ newDeadline = now;
+ }
+ if (this.nodeState == NodeState.CLOSING) {
+ closeNode();
+ return;
+ }
+
+ this.deadline = newDeadline;
+
+ // Update the timer queue with the new deadline
+ engine.scheduleDelayedInContext(this);
+
+ this.nodeState = NodeState.PENDING;
+ }
+
+ /**
+ * This method is invoked when the one of the stage's InPorts or OutPorts is invalidated.
+ *
+ * @param now The virtual timestamp in milliseconds after epoch at which the update is occurring.
+ * @return The next deadline for the stage.
+ */
+ public abstract long onUpdate(long now);
+
+ /**
+ * This method is invoked when an uncaught exception is caught by the engine. When this happens, the
+ */
+ void doFail(Throwable cause) {
+ LOGGER.warn("Uncaught exception (closing stage)", cause);
+
+ closeNode();
+ }
+
+ /**
+ * This method is invoked when the {@link FlowNode} exits successfully or due to failure.
+ */
+ public void closeNode() {
+ if (this.nodeState == NodeState.CLOSED) {
+ // LOGGER.warn("Flowstage:doClose() => Tried closing a stage that was already closed");
+ return;
+ }
+
+ // If this stage is running an update, notify it that is should close after.
+ if (this.nodeState == NodeState.UPDATING) {
+ // LOGGER.warn("Flowstage:doClose() => Tried closing a stage, but update was active");
+ this.nodeState = NodeState.CLOSING;
+ return;
+ }
+
+ // Mark the stage as closed
+ this.nodeState = NodeState.CLOSED;
+
+ // Remove stage from parent graph
+ this.parentGraph.removeNode(this);
+
+ // Remove stage from the timer queue
+ this.deadline = Long.MAX_VALUE;
+ this.engine.scheduleDelayedInContext(this);
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNodeQueue.java
index 56ec7702..37b3c65b 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageQueue.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowNodeQueue.java
@@ -20,35 +20,35 @@
* SOFTWARE.
*/
-package org.opendc.simulator.flow2;
+package org.opendc.simulator.engine;
import java.util.ArrayDeque;
import java.util.Arrays;
/**
- * A specialized {@link ArrayDeque} implementation that contains the {@link FlowStageLogic}s
+ * A specialized {@link ArrayDeque} implementation that contains the {@link FlowNode}s
* that have been updated during the engine cycle and should converge.
* <p>
* By using a specialized class, we reduce the overhead caused by type-erasure.
*/
-final class FlowStageQueue {
+final class FlowNodeQueue {
/**
* The array of elements in the queue.
*/
- private FlowStage[] elements;
+ private FlowNode[] elements;
private int head = 0;
private int tail = 0;
- public FlowStageQueue(int initialCapacity) {
- elements = new FlowStage[initialCapacity];
+ public FlowNodeQueue(int initialCapacity) {
+ elements = new FlowNode[initialCapacity];
}
/**
* Add the specified context to the queue.
*/
- void add(FlowStage ctx) {
- final FlowStage[] es = elements;
+ void add(FlowNode ctx) {
+ final FlowNode[] es = elements;
int tail = this.tail;
es[tail] = ctx;
@@ -62,12 +62,12 @@ final class FlowStageQueue {
}
/**
- * Remove a {@link FlowStage} from the queue or <code>null</code> if the queue is empty.
+ * Remove a {@link FlowNode} from the queue or <code>null</code> if the queue is empty.
*/
- FlowStage poll() {
- final FlowStage[] es = elements;
+ FlowNode poll() {
+ final FlowNode[] es = elements;
int head = this.head;
- FlowStage ctx = es[head];
+ FlowNode ctx = es[head];
if (ctx != null) {
es[head] = null;
@@ -87,7 +87,7 @@ final class FlowStageQueue {
throw new IllegalStateException("Sorry, deque too big");
}
- final FlowStage[] es = elements = Arrays.copyOf(elements, newCapacity);
+ final FlowNode[] es = elements = Arrays.copyOf(elements, newCapacity);
// Exceptionally, here tail == head needs to be disambiguated
if (tail < head || (tail == head && es[head] != null)) {
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/FlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowSupplier.java
index f9432c33..87729fca 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/FlowSource.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowSupplier.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2022 AtLarge Research
+ * Copyright (c) 2024 AtLarge Research
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -20,17 +20,17 @@
* SOFTWARE.
*/
-package org.opendc.simulator.flow2.source;
+package org.opendc.simulator.engine;
-import org.opendc.simulator.flow2.FlowStage;
-import org.opendc.simulator.flow2.Outlet;
+public interface FlowSupplier {
-/**
- * A {@link FlowStage} with a single output.
- */
-public interface FlowSource {
- /**
- * Return the output of this {@link FlowSource}.
- */
- Outlet getOutput();
+ void handleDemand(FlowEdge consumerEdge, float newDemand);
+
+ void pushSupply(FlowEdge consumerEdge, float newSupply);
+
+ void addConsumerEdge(FlowEdge consumerEdge);
+
+ void removeConsumerEdge(FlowEdge consumerEdge);
+
+ float getCapacity();
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowTimerQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowTimerQueue.java
index 4b746202..1e348b10 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowTimerQueue.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/FlowTimerQueue.java
@@ -20,21 +20,21 @@
* SOFTWARE.
*/
-package org.opendc.simulator.flow2;
+package org.opendc.simulator.engine;
import java.util.Arrays;
/**
- * A specialized priority queue for timers of {@link FlowStageLogic}s.
+ * A specialized priority queue for timers of {@link FlowNode}s.
* <p>
* By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation
* being generic.
*/
-final class FlowTimerQueue {
+public final class FlowTimerQueue {
/**
- * Array representation of binary heap of {@link FlowStage} instances.
+ * Array representation of binary heap of {@link FlowNode} instances.
*/
- private FlowStage[] queue;
+ private FlowNode[] queue;
/**
* The number of elements in the priority queue.
@@ -47,21 +47,21 @@ final class FlowTimerQueue {
* @param initialCapacity The initial capacity of the queue.
*/
public FlowTimerQueue(int initialCapacity) {
- this.queue = new FlowStage[initialCapacity];
+ this.queue = new FlowNode[initialCapacity];
}
/**
* Enqueue a timer for the specified context or update the existing timer.
*/
- void enqueue(FlowStage ctx) {
- FlowStage[] es = queue;
- int k = ctx.timerIndex;
+ public void enqueue(FlowNode node) {
+ FlowNode[] es = queue;
+ int k = node.timerIndex;
- if (ctx.deadline != Long.MAX_VALUE) {
+ if (node.deadline != Long.MAX_VALUE) {
if (k >= 0) {
- update(es, ctx, k);
+ update(es, node, k);
} else {
- add(es, ctx);
+ add(es, node);
}
} else if (k >= 0) {
delete(es, k);
@@ -74,14 +74,13 @@ final class FlowTimerQueue {
* @param now The timestamp that the deadline of the head of the queue should not exceed.
* @return The head of the queue if its deadline does not exceed <code>now</code>, otherwise <code>null</code>.
*/
- FlowStage poll(long now) {
- int size = this.size;
- if (size == 0) {
+ public FlowNode poll(long now) {
+ if (this.size == 0) {
return null;
}
- final FlowStage[] es = queue;
- final FlowStage head = es[0];
+ final FlowNode[] es = queue;
+ final FlowNode head = es[0];
if (now < head.deadline) {
return null;
@@ -89,7 +88,7 @@ final class FlowTimerQueue {
int n = size - 1;
this.size = n;
- final FlowStage next = es[n];
+ final FlowNode next = es[n];
es[n] = null; // Clear the last element of the queue
if (n > 0) {
@@ -103,9 +102,9 @@ final class FlowTimerQueue {
/**
* Find the earliest deadline in the queue.
*/
- long peekDeadline() {
- if (size > 0) {
- return queue[0].deadline;
+ public long peekDeadline() {
+ if (this.size > 0) {
+ return this.queue[0].deadline;
}
return Long.MAX_VALUE;
@@ -114,43 +113,41 @@ final class FlowTimerQueue {
/**
* Add a new entry to the queue.
*/
- private void add(FlowStage[] es, FlowStage ctx) {
- int i = size;
-
- if (i >= es.length) {
+ private void add(FlowNode[] es, FlowNode node) {
+ if (this.size >= es.length) {
// Re-fetch the resized array
es = grow();
}
- siftUp(i, ctx, es);
+ siftUp(this.size, node, es);
- size = i + 1;
+ this.size++;
}
/**
* Update the deadline of an existing entry in the queue.
*/
- private void update(FlowStage[] es, FlowStage ctx, int k) {
+ private void update(FlowNode[] es, FlowNode node, int k) {
if (k > 0) {
int parent = (k - 1) >>> 1;
- if (es[parent].deadline > ctx.deadline) {
- siftUp(k, ctx, es);
+ if (es[parent].deadline > node.deadline) {
+ siftUp(k, node, es);
return;
}
}
- siftDown(k, ctx, es, size);
+ siftDown(k, node, es, this.size);
}
/**
* Deadline an entry from the queue.
*/
- private void delete(FlowStage[] es, int k) {
- int s = --size;
+ private void delete(FlowNode[] es, int k) {
+ int s = --this.size;
if (s == k) {
es[k] = null; // Element is last in the queue
} else {
- FlowStage moved = es[s];
+ FlowNode moved = es[s];
es[s] = null;
siftDown(k, moved, es, s);
@@ -164,8 +161,8 @@ final class FlowTimerQueue {
/**
* Increases the capacity of the array.
*/
- private FlowStage[] grow() {
- FlowStage[] queue = this.queue;
+ private FlowNode[] grow() {
+ FlowNode[] queue = this.queue;
int oldCapacity = queue.length;
int newCapacity = oldCapacity + (oldCapacity >> 1);
@@ -174,10 +171,10 @@ final class FlowTimerQueue {
return queue;
}
- private static void siftUp(int k, FlowStage key, FlowStage[] es) {
+ private static void siftUp(int k, FlowNode key, FlowNode[] es) {
while (k > 0) {
int parent = (k - 1) >>> 1;
- FlowStage e = es[parent];
+ FlowNode e = es[parent];
if (key.deadline >= e.deadline) break;
es[k] = e;
e.timerIndex = k;
@@ -187,11 +184,11 @@ final class FlowTimerQueue {
key.timerIndex = k;
}
- private static void siftDown(int k, FlowStage key, FlowStage[] es, int n) {
+ private static void siftDown(int k, FlowNode key, FlowNode[] es, int n) {
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
- FlowStage c = es[child];
+ FlowNode c = es[child];
int right = child + 1;
if (right < n && c.deadline > es[right].deadline) c = es[child = right];
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InvocationStack.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/InvocationStack.java
index a5b5114b..15da2f23 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InvocationStack.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/InvocationStack.java
@@ -20,7 +20,7 @@
* SOFTWARE.
*/
-package org.opendc.simulator.flow2;
+package org.opendc.simulator.engine;
import java.util.Arrays;
@@ -29,7 +29,7 @@ import java.util.Arrays;
* <p>
* By using a specialized class, we reduce the overhead caused by type-erasure.
*/
-final class InvocationStack {
+public final class InvocationStack {
/**
* The array of elements in the stack.
*/
@@ -38,8 +38,8 @@ final class InvocationStack {
private int head = -1;
public InvocationStack(int initialCapacity) {
- elements = new long[initialCapacity];
- Arrays.fill(elements, Long.MIN_VALUE);
+ this.elements = new long[initialCapacity];
+ Arrays.fill(this.elements, Long.MIN_VALUE);
}
/**
@@ -48,8 +48,8 @@ final class InvocationStack {
* @param invocation The timestamp of the invocation.
* @return <code>true</code> if the invocation was added, <code>false</code> otherwise.
*/
- boolean tryAdd(long invocation) {
- final long[] es = elements;
+ public boolean tryAdd(long invocation) {
+ final long[] es = this.elements;
int head = this.head;
if (head < 0 || es[head] > invocation) {
@@ -69,12 +69,11 @@ final class InvocationStack {
/**
* Remove the head invocation from the stack or return {@link Long#MAX_VALUE} if the stack is empty.
*/
- long poll() {
- final long[] es = elements;
+ public long poll() {
int head = this.head--;
if (head >= 0) {
- return es[head];
+ return this.elements[head];
}
return Long.MAX_VALUE;
@@ -84,12 +83,12 @@ final class InvocationStack {
* Doubles the capacity of this deque
*/
private void doubleCapacity() {
- int oldCapacity = elements.length;
+ int oldCapacity = this.elements.length;
int newCapacity = oldCapacity + (oldCapacity >> 1);
if (newCapacity < 0) {
throw new IllegalStateException("Sorry, deque too big");
}
- elements = Arrays.copyOf(elements, newCapacity);
+ this.elements = Arrays.copyOf(this.elements, newCapacity);
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraph.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraph.java
deleted file mode 100644
index f45be6cd..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraph.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2;
-
-/**
- * A representation of a flow network. A flow network is a directed graph where each edge has a capacity and receives an
- * amount of flow that cannot exceed the edge's capacity.
- */
-public interface FlowGraph {
- /**
- * Return the {@link FlowEngine} driving the simulation of the graph.
- */
- FlowEngine getEngine();
-
- /**
- * Create a new {@link FlowStage} representing a node in the flow network.
- *
- * @param logic The logic for handling the events of the stage.
- */
- FlowStage newStage(FlowStageLogic logic);
-
- /**
- * Add an edge between the specified outlet port and inlet port in this graph.
- *
- * @param outlet The outlet of the source from which the flow originates.
- * @param inlet The inlet of the sink that should receive the flow.
- */
- void connect(Outlet outlet, Inlet inlet);
-
- /**
- * Disconnect the specified {@link Outlet} (if connected).
- *
- * @param outlet The outlet to disconnect.
- */
- void disconnect(Outlet outlet);
-
- /**
- * Disconnect the specified {@link Inlet} (if connected).
- *
- * @param inlet The inlet to disconnect.
- */
- void disconnect(Inlet inlet);
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraphInternal.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraphInternal.java
deleted file mode 100644
index 0f608b60..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraphInternal.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2;
-
-/**
- * Interface implemented by {@link FlowGraph} implementations.
- */
-interface FlowGraphInternal extends FlowGraph {
- /**
- * Internal method to remove the specified {@link FlowStage} from the graph.
- */
- void detach(FlowStage stage);
-
- /**
- * Helper method to connect an outlet to an inlet.
- */
- static void connect(FlowGraph graph, Outlet outlet, Inlet inlet) {
- if (!(outlet instanceof OutPort) || !(inlet instanceof InPort)) {
- throw new IllegalArgumentException("Invalid outlet or inlet passed to graph");
- }
-
- InPort inPort = (InPort) inlet;
- OutPort outPort = (OutPort) outlet;
-
- if (!graph.equals(outPort.getGraph()) || !graph.equals(inPort.getGraph())) {
- throw new IllegalArgumentException("Outlet or inlet does not belong to graph");
- } else if (outPort.input != null || inPort.output != null) {
- throw new IllegalStateException("Inlet or outlet already connected");
- }
-
- outPort.input = inPort;
- inPort.output = outPort;
-
- inPort.connect();
- outPort.connect();
- }
-
- /**
- * Helper method to disconnect an outlet.
- */
- static void disconnect(FlowGraph graph, Outlet outlet) {
- if (!(outlet instanceof OutPort)) {
- throw new IllegalArgumentException("Invalid outlet passed to graph");
- }
-
- OutPort outPort = (OutPort) outlet;
-
- if (!graph.equals(outPort.getGraph())) {
- throw new IllegalArgumentException("Outlet or inlet does not belong to graph");
- }
-
- outPort.cancel(null);
- outPort.complete();
- }
-
- /**
- * Helper method to disconnect an inlet.
- */
- static void disconnect(FlowGraph graph, Inlet inlet) {
- if (!(inlet instanceof InPort)) {
- throw new IllegalArgumentException("Invalid outlet passed to graph");
- }
-
- InPort inPort = (InPort) inlet;
-
- if (!graph.equals(inPort.getGraph())) {
- throw new IllegalArgumentException("Outlet or inlet does not belong to graph");
- }
-
- inPort.finish(null);
- inPort.cancel(null);
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java
deleted file mode 100644
index 25f87e04..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2;
-
-import java.time.InstantSource;
-import java.util.HashMap;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@link FlowStage} represents a node in a {@link FlowGraph}.
- */
-public final class FlowStage {
- private static final Logger LOGGER = LoggerFactory.getLogger(FlowStage.class);
-
- /**
- * States of the flow stage.
- */
- private static final int STAGE_PENDING = 0; // Stage is pending to be started
-
- private static final int STAGE_ACTIVE = 1; // Stage is actively running
- private static final int STAGE_CLOSED = 2; // Stage is closed
- private static final int STAGE_STATE = 0b11; // Mask for accessing the state of the flow stage
-
- /**
- * Flags of the flow connection
- */
- private static final int STAGE_INVALIDATE = 1 << 2; // The stage is invalidated
-
- private static final int STAGE_CLOSE = 1 << 3; // The stage should be closed
- private static final int STAGE_UPDATE_ACTIVE = 1 << 4; // An update for the connection is active
- private static final int STAGE_UPDATE_PENDING = 1 << 5; // An (immediate) update of the connection is pending
-
- /**
- * The flags representing the state and pending actions for the stage.
- */
- private int flags = STAGE_PENDING;
-
- /**
- * The deadline of the stage after which an update should run.
- */
- long deadline = Long.MAX_VALUE;
-
- /**
- * The index of the timer in the {@link FlowTimerQueue}.
- */
- int timerIndex = -1;
-
- final InstantSource clock;
- private final FlowStageLogic logic;
- final FlowGraphInternal parentGraph;
- private final FlowEngine engine;
-
- private final Map<String, InPort> inlets = new HashMap<>();
- private final Map<String, OutPort> outlets = new HashMap<>();
- private int nextInlet = 0;
- private int nextOutlet = 0;
-
- /**
- * Construct a new {@link FlowStage} instance.
- *
- * @param parentGraph The {@link FlowGraph} this stage belongs to.
- * @param logic The logic of the stage.
- */
- FlowStage(FlowGraphInternal parentGraph, FlowStageLogic logic) {
- this.parentGraph = parentGraph;
- this.logic = logic;
- this.engine = parentGraph.getEngine();
- this.clock = engine.getClock();
- }
-
- /**
- * Return the {@link FlowGraph} to which this stage belongs.
- */
- public FlowGraph getGraph() {
- return parentGraph;
- }
-
- /**
- * Return the {@link Inlet} (an in-going edge) with the specified <code>name</code> for this {@link FlowStage}.
- * If an inlet with that name does not exist, a new one is allocated for the stage.
- *
- * @param name The name of the inlet.
- * @return The {@link InPort} representing an {@link Inlet} with the specified <code>name</code>.
- */
- public InPort getInlet(String name) {
- return inlets.computeIfAbsent(name, (key) -> new InPort(this, key, nextInlet++));
- }
-
- /**
- * Return the {@link Outlet} (an out-going edge) with the specified <code>name</code> for this {@link FlowStage}.
- * If an outlet with that name does not exist, a new one is allocated for the stage.
- *
- * @param name The name of the outlet.
- * @return The {@link OutPort} representing an {@link Outlet} with the specified <code>name</code>.
- */
- public OutPort getOutlet(String name) {
- return outlets.computeIfAbsent(name, (key) -> new OutPort(this, key, nextOutlet++));
- }
-
- /**
- * Return the current deadline of the {@link FlowStage}'s timer (in milliseconds after epoch).
- */
- public long getDeadline() {
- return deadline;
- }
-
- /**
- * Set the deadline of the {@link FlowStage}'s timer.
- *
- * @param deadline The new deadline (in milliseconds after epoch) when the stage should be interrupted.
- */
- public void setDeadline(long deadline) {
- this.deadline = deadline;
-
- if ((flags & STAGE_UPDATE_ACTIVE) == 0) {
- // Update the timer queue with the new deadline
- engine.scheduleDelayed(this);
- }
- }
-
- /**
- * Invalidate the {@link FlowStage} forcing the stage to update.
- */
- public void invalidate() {
- int flags = this.flags;
-
- if ((flags & STAGE_UPDATE_ACTIVE) == 0) {
- scheduleImmediate(clock.millis(), flags | STAGE_INVALIDATE);
- }
- }
-
- /**
- * Synchronously update the {@link FlowStage} at the current timestamp.
- */
- public void sync() {
- this.flags |= STAGE_INVALIDATE;
- onUpdate(clock.millis());
- engine.scheduleDelayed(this);
- }
-
- /**
- * Close the {@link FlowStage} and disconnect all inlets and outlets.
- */
- public void close() {
- int flags = this.flags;
-
- if ((flags & STAGE_STATE) == STAGE_CLOSED) {
- return;
- }
-
- // Toggle the close bit. In case no update is active, schedule a new update.
- if ((flags & STAGE_UPDATE_ACTIVE) != 0) {
- this.flags = flags | STAGE_CLOSE;
- } else {
- scheduleImmediate(clock.millis(), flags | STAGE_CLOSE);
- }
- }
-
- /**
- * Update the state of the flow stage.
- *
- * @param now The current virtual timestamp.
- */
- void onUpdate(long now) {
- int flags = this.flags;
- int state = flags & STAGE_STATE;
-
- if (state == STAGE_ACTIVE) {
- doUpdate(now, flags);
- } else if (state == STAGE_PENDING) {
- doStart(now, flags);
- }
- }
-
- /**
- * Invalidate the {@link FlowStage} forcing the stage to update.
- *
- * <p>
- * This method is similar to {@link #invalidate()}, but allows the user to manually pass the current timestamp to
- * prevent having to re-query the clock. This method should not be called during an update.
- */
- void invalidate(long now) {
- scheduleImmediate(now, flags | STAGE_INVALIDATE);
- }
-
- /**
- * Schedule an immediate update for this stage.
- */
- private void scheduleImmediate(long now, int flags) {
- // In case an immediate update is already scheduled, no need to do anything
- if ((flags & STAGE_UPDATE_PENDING) != 0) {
- this.flags = flags;
- return;
- }
-
- // Mark the stage that there is an update pending
- this.flags = flags | STAGE_UPDATE_PENDING;
-
- engine.scheduleImmediate(now, this);
- }
-
- /**
- * Start the stage.
- */
- private void doStart(long now, int flags) {
- // Update state before calling into the outside world, so it observes a consistent state
- flags = flags | STAGE_ACTIVE | STAGE_UPDATE_ACTIVE;
-
- doUpdate(now, flags);
- }
-
- /**
- * Update the state of the stage.
- */
- private void doUpdate(long now, int flags) {
- long deadline = this.deadline;
- long newDeadline = deadline;
-
- // Update the stage if:
- // (1) the timer of the stage has expired.
- // (2) one of the input ports is pushed,
- // (3) one of the output ports is pulled,
- if ((flags & STAGE_INVALIDATE) != 0 || deadline == now) {
- // Update state before calling into the outside world, so it observes a consistent state
- this.flags = (flags & ~STAGE_INVALIDATE) | STAGE_UPDATE_ACTIVE;
-
- try {
- newDeadline = logic.onUpdate(this, now);
-
- // IMPORTANT: Re-fetch the flags after the callback might have changed those
- flags = this.flags;
- } catch (Exception e) {
- doFail(e);
- }
- }
-
- // Check whether the stage is marked as closing.
- if ((flags & STAGE_CLOSE) != 0) {
- doClose(flags, null);
-
- // IMPORTANT: Re-fetch the flags after the callback might have changed those
- flags = this.flags;
- }
-
- // Indicate that no update is active anymore and flush the flags
- this.flags = flags & ~(STAGE_UPDATE_ACTIVE | STAGE_UPDATE_PENDING);
- this.deadline = newDeadline;
-
- // Update the timer queue with the new deadline
- engine.scheduleDelayedInContext(this);
- }
-
- /**
- * This method is invoked when an uncaught exception is caught by the engine. When this happens, the
- * {@link FlowStageLogic} "fails" and disconnects all its inputs and outputs.
- */
- void doFail(Throwable cause) {
- LOGGER.warn("Uncaught exception (closing stage)", cause);
-
- doClose(flags, cause);
- }
-
- /**
- * This method is invoked when the {@link FlowStageLogic} exits successfully or due to failure.
- */
- private void doClose(int flags, Throwable cause) {
- // Mark the stage as closed
- this.flags = flags & ~(STAGE_STATE | STAGE_INVALIDATE | STAGE_CLOSE) | STAGE_CLOSED;
-
- // Remove stage from parent graph
- parentGraph.detach(this);
-
- // Remove stage from the timer queue
- setDeadline(Long.MAX_VALUE);
-
- // Cancel all input ports
- for (InPort port : inlets.values()) {
- if (port != null) {
- port.cancel(cause);
- }
- }
-
- // Cancel all output ports
- for (OutPort port : outlets.values()) {
- if (port != null) {
- port.fail(cause);
- }
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageLogic.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageLogic.java
deleted file mode 100644
index 70986a35..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageLogic.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2;
-
-/**
- * The {@link FlowStageLogic} interface is responsible for describing the behaviour of a {@link FlowStage} via
- * out-going flows based on its potential inputs.
- */
-public interface FlowStageLogic {
- /**
- * This method is invoked when the one of the stage's inlets or outlets is invalidated.
- *
- * @param ctx The context in which the stage runs.
- * @param now The virtual timestamp in milliseconds after epoch at which the update is occurring.
- * @return The next deadline for the stage.
- */
- long onUpdate(FlowStage ctx, long now);
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandler.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandler.java
deleted file mode 100644
index 839b01db..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandler.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2;
-
-/**
- * Collection of callbacks for the input port (a {@link InPort}) of a {@link FlowStageLogic}.
- */
-public interface InHandler {
- /**
- * Return the actual flow rate over the input port.
- *
- * @param port The input port to which the flow was pushed.
- * @return The actual flow rate over the port.
- */
- default float getRate(InPort port) {
- return Math.min(port.getDemand(), port.getCapacity());
- }
-
- /**
- * This method is invoked when another {@link FlowStageLogic} changes the rate of flow to the specified inlet.
- *
- * @param port The input port to which the flow was pushed.
- * @param demand The rate of flow the output attempted to push to the port.
- */
- void onPush(InPort port, float demand);
-
- /**
- * This method is invoked when the input port is finished.
- *
- * @param port The input port that has finished.
- * @param cause The cause of the input port being finished or <code>null</code> if the port completed successfully.
- */
- void onUpstreamFinish(InPort port, Throwable cause);
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandlers.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandlers.java
deleted file mode 100644
index 9d5b4bef..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandlers.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2;
-
-/**
- * A collection of common {@link InHandler} implementations.
- */
-public class InHandlers {
- /**
- * Prevent construction of this class.
- */
- private InHandlers() {}
-
- /**
- * Return an {@link InHandler} that does nothing.
- */
- public static InHandler noop() {
- return NoopInHandler.INSTANCE;
- }
-
- /**
- * No-op implementation of {@link InHandler}.
- */
- private static final class NoopInHandler implements InHandler {
- public static final InHandler INSTANCE = new NoopInHandler();
-
- @Override
- public void onPush(InPort port, float demand) {}
-
- @Override
- public void onUpstreamFinish(InPort port, Throwable cause) {}
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java
deleted file mode 100644
index 16fed4eb..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2;
-
-import java.time.InstantSource;
-import java.util.Objects;
-
-/**
- * A port that consumes a flow.
- * <p>
- * Input ports are represented as in-going edges in the flow graph.
- */
-public final class InPort implements Inlet {
- private final int id;
-
- private float capacity;
- private float demand;
-
- private boolean mask;
-
- OutPort output;
- private InHandler handler = InHandlers.noop();
- private final InstantSource clock;
- private final String name;
- private final FlowStage stage;
-
- InPort(FlowStage stage, String name, int id) {
- this.name = name;
- this.id = id;
- this.stage = stage;
- this.clock = stage.clock;
- }
-
- @Override
- public FlowGraph getGraph() {
- return stage.parentGraph;
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- /**
- * Return the identifier of the {@link InPort} with respect to its stage.
- */
- public int getId() {
- return id;
- }
-
- /**
- * Return the current capacity of the input port.
- */
- public float getCapacity() {
- return capacity;
- }
-
- /**
- * Return the current demand of flow of the input port.
- */
- public float getDemand() {
- return demand;
- }
-
- /**
- * Return the current rate of flow of the input port.
- */
- public float getRate() {
- return handler.getRate(this);
- }
-
- /**
- * Pull the flow with the specified <code>capacity</code> from the input port.
- *
- * @param capacity The maximum throughput that the stage can receive from the input port.
- */
- public void pull(float capacity) {
- this.capacity = capacity;
-
- OutPort output = this.output;
- if (output != null) {
- output.pull(capacity);
- }
- }
-
- /**
- * Return the current {@link InHandler} of the input port.
- */
- public InHandler getHandler() {
- return handler;
- }
-
- /**
- * Set the {@link InHandler} of the input port.
- */
- public void setHandler(InHandler handler) {
- this.handler = handler;
- }
-
- /**
- * Return the mask of this port.
- * <p>
- * Stages ignore events originating from masked ports.
- */
- public boolean getMask() {
- return mask;
- }
-
- /**
- * (Un)mask the port.
- */
- public void setMask(boolean mask) {
- this.mask = mask;
- }
-
- /**
- * Disconnect the input port from its (potentially) connected outlet.
- * <p>
- * The inlet can still be used and re-connected to another outlet.
- *
- * @param cause The cause for disconnecting the port or <code>null</code> when no more flow is needed.
- */
- public void cancel(Throwable cause) {
- demand = 0.f;
-
- OutPort output = this.output;
- if (output != null) {
- this.output = null;
- output.input = null;
- output.cancel(cause);
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- InPort port = (InPort) o;
- return stage.equals(port.stage) && name.equals(port.name);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(stage.parentGraph, name);
- }
-
- /**
- * This method is invoked when the inlet is connected to an outlet.
- */
- void connect() {
- OutPort output = this.output;
- output.pull(capacity);
- }
-
- /**
- * Push a flow from an outlet to this inlet.
- *
- * @param demand The rate of flow to push.
- */
- void push(float demand) {
- // No-op when the rate is unchanged
- if (this.demand == demand) {
- return;
- }
-
- try {
- handler.onPush(this, demand);
- this.demand = demand;
-
- if (!mask) {
- stage.invalidate(clock.millis());
- }
- } catch (Exception e) {
- stage.doFail(e);
- }
- }
-
- /**
- * This method is invoked by the connected {@link OutPort} when it finishes.
- */
- void finish(Throwable cause) {
- try {
- long now = clock.millis();
- handler.onUpstreamFinish(this, cause);
- this.demand = 0.f;
-
- if (!mask) {
- stage.invalidate(now);
- }
- } catch (Exception e) {
- stage.doFail(e);
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandler.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandler.java
deleted file mode 100644
index 723c6d6b..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandler.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2;
-
-/**
- * Collection of callbacks for the output port (a {@link OutPort}) of a {@link FlowStageLogic}.
- */
-public interface OutHandler {
- /**
- * This method is invoked when another {@link FlowStageLogic} changes the capacity of the outlet.
- *
- * @param port The output port of which the capacity was changed.
- * @param capacity The new capacity of the outlet.
- */
- void onPull(OutPort port, float capacity);
-
- /**
- * This method is invoked when the output port no longer accepts any flow.
- * <p>
- * After this callback no other callbacks will be called for this port.
- *
- * @param port The outlet that no longer accepts any flow.
- * @param cause The cause of the output port no longer accepting any flow or <code>null</code> if the port closed
- * successfully.
- */
- void onDownstreamFinish(OutPort port, Throwable cause);
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandlers.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandlers.java
deleted file mode 100644
index 8fbfda0d..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandlers.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2;
-
-/**
- * A collection of common {@link OutHandler} implementations.
- */
-public class OutHandlers {
- /**
- * Prevent construction of this class.
- */
- private OutHandlers() {}
-
- /**
- * Return an {@link OutHandler} that does nothing.
- */
- public static OutHandler noop() {
- return NoopOutHandler.INSTANCE;
- }
-
- /**
- * No-op implementation of {@link OutHandler}.
- */
- private static final class NoopOutHandler implements OutHandler {
- public static final OutHandler INSTANCE = new NoopOutHandler();
-
- @Override
- public void onPull(OutPort port, float capacity) {}
-
- @Override
- public void onDownstreamFinish(OutPort port, Throwable cause) {}
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java
deleted file mode 100644
index 1f7ed4ee..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2;
-
-import java.time.InstantSource;
-import java.util.Objects;
-
-/**
- * A port that outputs a flow.
- * <p>
- * Output ports are represented as out-going edges in the flow graph.
- */
-public final class OutPort implements Outlet {
- private final int id;
-
- private float capacity;
- private float demand;
-
- private boolean mask;
-
- InPort input;
- private OutHandler handler = OutHandlers.noop();
- private final String name;
- private final FlowStage stage;
- private final InstantSource clock;
-
- OutPort(FlowStage stage, String name, int id) {
- this.name = name;
- this.id = id;
- this.stage = stage;
- this.clock = stage.clock;
- }
-
- @Override
- public FlowGraph getGraph() {
- return stage.parentGraph;
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- /**
- * Return the identifier of the {@link OutPort} with respect to its stage.
- */
- public int getId() {
- return id;
- }
-
- /**
- * Return the capacity of the output port.
- */
- public float getCapacity() {
- return capacity;
- }
-
- /**
- * Return the current demand of flow of the output port.
- */
- public float getDemand() {
- return demand;
- }
-
- /**
- * Return the current rate of flow of the input port.
- */
- public float getRate() {
- InPort input = this.input;
- if (input != null) {
- return input.getRate();
- }
-
- return 0.f;
- }
-
- /**
- * Return the current {@link OutHandler} of the output port.
- */
- public OutHandler getHandler() {
- return handler;
- }
-
- /**
- * Set the {@link OutHandler} of the output port.
- */
- public void setHandler(OutHandler handler) {
- this.handler = handler;
- }
-
- /**
- * Return the mask of this port.
- * <p>
- * Stages ignore events originating from masked ports.
- */
- public boolean getMask() {
- return mask;
- }
-
- /**
- * (Un)mask the port.
- */
- public void setMask(boolean mask) {
- this.mask = mask;
- }
-
- /**
- * Push the given flow rate over output port.
- *
- * @param rate The rate of the flow to push.
- */
- public void push(float rate) {
- demand = rate;
- InPort input = this.input;
-
- if (input != null) {
- input.push(rate);
- }
- }
-
- /**
- * Signal to the downstream port that the output has completed successfully and disconnect the port from its input.
- * <p>
- * The output port can still be used and re-connected to another input.
- */
- public void complete() {
- fail(null);
- }
-
- /**
- * Signal a failure to the downstream port and disconnect the port from its input.
- * <p>
- * The output can still be used and re-connected to another input.
- */
- public void fail(Throwable cause) {
- capacity = 0.f;
-
- InPort input = this.input;
- if (input != null) {
- this.input = null;
- input.output = null;
- input.finish(cause);
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- OutPort port = (OutPort) o;
- return stage.equals(port.stage) && name.equals(port.name);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(stage.parentGraph, name);
- }
-
- /**
- * This method is invoked when the outlet is connected to an inlet.
- */
- void connect() {
- input.push(demand);
- }
-
- /**
- * Pull from this outlet with a specified capacity.
- *
- * @param capacity The capacity of the inlet.
- */
- void pull(float capacity) {
- // No-op when outlet is not active or the rate is unchanged
- if (this.capacity == capacity) {
- return;
- }
-
- try {
- handler.onPull(this, capacity);
- this.capacity = capacity;
-
- if (!mask) {
- stage.invalidate(clock.millis());
- }
- } catch (Exception e) {
- stage.doFail(e);
- }
- }
-
- /**
- * This method is invoked by the connected {@link InPort} when downstream cancels the connection.
- */
- void cancel(Throwable cause) {
- try {
- handler.onDownstreamFinish(this, cause);
- this.capacity = 0.f;
-
- if (!mask) {
- stage.invalidate(clock.millis());
- }
- } catch (Exception e) {
- stage.doFail(e);
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Outlet.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Outlet.java
deleted file mode 100644
index 32e19a3b..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Outlet.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2;
-
-/**
- * An out-going edge in a {@link FlowGraph}.
- */
-public interface Outlet {
- /**
- * Return the {@link FlowGraph} to which the outlet is exposed.
- */
- FlowGraph getGraph();
-
- /**
- * Return the name of the outlet.
- */
- String getName();
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java
deleted file mode 100644
index dec98955..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2.mux;
-
-import org.opendc.simulator.flow2.FlowStageLogic;
-import org.opendc.simulator.flow2.Inlet;
-import org.opendc.simulator.flow2.Outlet;
-
-/**
- * A {@link FlowStageLogic} that multiplexes multiple inputs over (possibly) multiple outputs.
- */
-public interface FlowMultiplexer {
- /**
- * Return maximum number of inputs supported by the multiplexer.
- */
- int getMaxInputs();
-
- /**
- * Return maximum number of outputs supported by the multiplexer.
- */
- int getMaxOutputs();
-
- /**
- * Return the number of active inputs on this multiplexer.
- */
- int getInputCount();
-
- /**
- * Allocate a new input on this multiplexer with the specified capacity..
- *
- * @return The identifier of the input for this stage.
- */
- Inlet newInput();
-
- /**
- * Release the input at the specified slot.
- *
- * @param inlet The inlet to release.
- */
- void releaseInput(Inlet inlet);
-
- /**
- * Return the number of active outputs on this multiplexer.
- */
- int getOutputCount();
-
- /**
- * Allocate a new output on this multiplexer.
- *
- * @return The outlet for this stage.
- */
- Outlet newOutput();
-
- /**
- * Release the output at the specified slot.
- *
- * @param outlet The outlet to release.
- */
- void releaseOutput(Outlet outlet);
-
- /**
- * Return the total input capacity of the {@link FlowMultiplexer}.
- */
- float getCapacity();
-
- /**
- * Return the total input demand for the {@link FlowMultiplexer}.
- */
- float getDemand();
-
- /**
- * Return the total input rate for the {@link FlowMultiplexer}.
- */
- float getRate();
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexerFactory.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexerFactory.java
deleted file mode 100644
index 0b5b9141..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexerFactory.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2.mux;
-
-import org.opendc.simulator.flow2.FlowGraph;
-
-/**
- * Factory interface for a {@link FlowMultiplexer} implementation.
- */
-public interface FlowMultiplexerFactory {
- /**
- * Construct a new {@link FlowMultiplexer} belonging to the specified {@link FlowGraph}.
- *
- * @param graph The graph to which the multiplexer belongs.
- */
- FlowMultiplexer newMultiplexer(FlowGraph graph);
-
- /**
- * Return a {@link FlowMultiplexerFactory} for {@link ForwardingFlowMultiplexer} instances.
- */
- static FlowMultiplexerFactory forwardingMultiplexer() {
- return ForwardingFlowMultiplexer.FACTORY;
- }
-
- /**
- * Return a {@link FlowMultiplexerFactory} for {@link MaxMinFlowMultiplexer} instances.
- */
- static FlowMultiplexerFactory maxMinMultiplexer() {
- return MaxMinFlowMultiplexer.FACTORY;
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java
deleted file mode 100644
index e0564cd2..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2.mux;
-
-import java.util.Arrays;
-import java.util.BitSet;
-import org.opendc.simulator.flow2.FlowGraph;
-import org.opendc.simulator.flow2.FlowStage;
-import org.opendc.simulator.flow2.FlowStageLogic;
-import org.opendc.simulator.flow2.InHandler;
-import org.opendc.simulator.flow2.InPort;
-import org.opendc.simulator.flow2.Inlet;
-import org.opendc.simulator.flow2.OutHandler;
-import org.opendc.simulator.flow2.OutPort;
-import org.opendc.simulator.flow2.Outlet;
-
-/**
- * A {@link FlowMultiplexer} implementation that allocates inputs to the outputs of the multiplexer exclusively.
- * This means that a single input is directly connected to an output and that the multiplexer can only support as many
- * inputs as outputs.
- */
-public final class ForwardingFlowMultiplexer implements FlowMultiplexer, FlowStageLogic {
- /**
- * Factory implementation for this implementation.
- */
- static FlowMultiplexerFactory FACTORY = ForwardingFlowMultiplexer::new;
-
- public final IdleInHandler IDLE_IN_HANDLER = new IdleInHandler();
- public final IdleOutHandler IDLE_OUT_HANDLER = new IdleOutHandler();
-
- private final FlowStage stage;
-
- private InPort[] inlets;
- private OutPort[] outlets;
- private final BitSet activeInputs;
- private final BitSet activeOutputs;
- private final BitSet availableOutputs;
-
- private float capacity = 0.f;
- private float demand = 0.f;
-
- public ForwardingFlowMultiplexer(FlowGraph graph) {
- this.stage = graph.newStage(this);
-
- this.inlets = new InPort[4];
- this.activeInputs = new BitSet();
- this.outlets = new OutPort[4];
- this.activeOutputs = new BitSet();
- this.availableOutputs = new BitSet();
- }
-
- @Override
- public float getCapacity() {
- return capacity;
- }
-
- @Override
- public float getDemand() {
- return demand;
- }
-
- @Override
- public float getRate() {
- final BitSet activeOutputs = this.activeOutputs;
- final OutPort[] outlets = this.outlets;
- float rate = 0.f;
- for (int i = activeOutputs.nextSetBit(0); i != -1; i = activeOutputs.nextSetBit(i + 1)) {
- rate += outlets[i].getRate();
- }
- return rate;
- }
-
- @Override
- public int getMaxInputs() {
- return getOutputCount();
- }
-
- @Override
- public int getMaxOutputs() {
- return Integer.MAX_VALUE;
- }
-
- @Override
- public int getInputCount() {
- return activeInputs.length();
- }
-
- @Override
- public Inlet newInput() {
- final BitSet activeInputs = this.activeInputs;
- int slot = activeInputs.nextClearBit(0);
-
- InPort inPort = stage.getInlet("in" + slot);
- inPort.setMask(true);
-
- InPort[] inlets = this.inlets;
- if (slot >= inlets.length) {
- int newLength = inlets.length + (inlets.length >> 1);
- inlets = Arrays.copyOf(inlets, newLength);
- this.inlets = inlets;
- }
-
- final BitSet availableOutputs = this.availableOutputs;
- int outSlot = availableOutputs.nextSetBit(0);
-
- if (outSlot < 0) {
- throw new IllegalStateException("No capacity available for a new input");
- }
-
- inlets[slot] = inPort;
- activeInputs.set(slot);
-
- OutPort outPort = outlets[outSlot];
- availableOutputs.clear(outSlot);
-
- inPort.setHandler(new ForwardingInHandler(outPort));
- outPort.setHandler(new ForwardingOutHandler(inPort));
-
- inPort.pull(outPort.getCapacity());
-
- return inPort;
- }
-
- @Override
- public void releaseInput(Inlet inlet) {
- InPort port = (InPort) inlet;
- int slot = port.getId();
-
- final BitSet activeInputs = this.activeInputs;
-
- if (!activeInputs.get(slot)) {
- return;
- }
-
- port.cancel(null);
- activeInputs.clear(slot);
-
- ForwardingInHandler inHandler = (ForwardingInHandler) port.getHandler();
- availableOutputs.set(inHandler.output.getId());
-
- port.setHandler(IDLE_IN_HANDLER);
- }
-
- @Override
- public int getOutputCount() {
- return activeOutputs.length();
- }
-
- @Override
- public Outlet newOutput() {
- final BitSet activeOutputs = this.activeOutputs;
- int slot = activeOutputs.nextClearBit(0);
-
- OutPort port = stage.getOutlet("out" + slot);
- OutPort[] outlets = this.outlets;
- if (slot >= outlets.length) {
- int newLength = outlets.length + (outlets.length >> 1);
- outlets = Arrays.copyOf(outlets, newLength);
- this.outlets = outlets;
- }
- outlets[slot] = port;
-
- activeOutputs.set(slot);
- availableOutputs.set(slot);
-
- port.setHandler(IDLE_OUT_HANDLER);
-
- return port;
- }
-
- @Override
- public void releaseOutput(Outlet outlet) {
- OutPort port = (OutPort) outlet;
- int slot = port.getId();
- activeInputs.clear(slot);
- availableOutputs.clear(slot);
- port.complete();
-
- port.setHandler(IDLE_OUT_HANDLER);
- }
-
- @Override
- public long onUpdate(FlowStage ctx, long now) {
- return Long.MAX_VALUE;
- }
-
- class ForwardingInHandler implements InHandler {
- final OutPort output;
-
- ForwardingInHandler(OutPort output) {
- this.output = output;
- }
-
- @Override
- public float getRate(InPort port) {
- return output.getRate();
- }
-
- @Override
- public void onPush(InPort port, float rate) {
- ForwardingFlowMultiplexer.this.demand += -port.getDemand() + rate;
-
- output.push(rate);
- }
-
- @Override
- public void onUpstreamFinish(InPort port, Throwable cause) {
- ForwardingFlowMultiplexer.this.demand -= port.getDemand();
-
- final OutPort output = this.output;
- output.push(0.f);
-
- releaseInput(port);
- }
- }
-
- private class ForwardingOutHandler implements OutHandler {
- private final InPort input;
-
- ForwardingOutHandler(InPort input) {
- this.input = input;
- }
-
- @Override
- public void onPull(OutPort port, float capacity) {
- ForwardingFlowMultiplexer.this.capacity += -port.getCapacity() + capacity;
-
- input.pull(capacity);
- }
-
- @Override
- public void onDownstreamFinish(OutPort port, Throwable cause) {
- ForwardingFlowMultiplexer.this.capacity -= port.getCapacity();
-
- input.cancel(cause);
-
- releaseOutput(port);
- }
- }
-
- private static class IdleInHandler implements InHandler {
- @Override
- public float getRate(InPort port) {
- return 0.f;
- }
-
- @Override
- public void onPush(InPort port, float rate) {
- port.cancel(new IllegalStateException("Inlet is not allocated"));
- }
-
- @Override
- public void onUpstreamFinish(InPort port, Throwable cause) {}
- }
-
- private class IdleOutHandler implements OutHandler {
- @Override
- public void onPull(OutPort port, float capacity) {
- ForwardingFlowMultiplexer.this.capacity += -port.getCapacity() + capacity;
- }
-
- @Override
- public void onDownstreamFinish(OutPort port, Throwable cause) {
- ForwardingFlowMultiplexer.this.capacity -= port.getCapacity();
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java
deleted file mode 100644
index ac5c4f5c..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2.mux;
-
-import java.util.Arrays;
-import java.util.BitSet;
-import org.opendc.simulator.flow2.FlowGraph;
-import org.opendc.simulator.flow2.FlowStage;
-import org.opendc.simulator.flow2.FlowStageLogic;
-import org.opendc.simulator.flow2.InHandler;
-import org.opendc.simulator.flow2.InPort;
-import org.opendc.simulator.flow2.Inlet;
-import org.opendc.simulator.flow2.OutHandler;
-import org.opendc.simulator.flow2.OutPort;
-import org.opendc.simulator.flow2.Outlet;
-
-/**
- * A {@link FlowMultiplexer} implementation that distributes the available capacity of the outputs over the inputs
- * using max-min fair sharing.
- * <p>
- * The max-min fair sharing algorithm of this multiplexer ensures that each input receives a fair share of the combined
- * output capacity, but allows individual inputs to use more capacity if there is still capacity left.
- */
-public final class MaxMinFlowMultiplexer implements FlowMultiplexer, FlowStageLogic {
- /**
- * Factory implementation for this implementation.
- */
- static FlowMultiplexerFactory FACTORY = MaxMinFlowMultiplexer::new;
-
- private final FlowStage stage;
- private final BitSet activeInputs;
- private final BitSet activeOutputs;
-
- private float capacity = 0.f;
- private float demand = 0.f;
- private float rate = 0.f;
-
- private InPort[] inlets;
- private long[] inputs;
- private float[] rates;
- private OutPort[] outlets;
-
- private final MultiplexerInHandler inHandler = new MultiplexerInHandler();
- private final MultiplexerOutHandler outHandler = new MultiplexerOutHandler();
-
- /**
- * Construct a {@link MaxMinFlowMultiplexer} instance.
- *
- * @param graph The {@link FlowGraph} to add the multiplexer to.
- */
- public MaxMinFlowMultiplexer(FlowGraph graph) {
- this.stage = graph.newStage(this);
- this.activeInputs = new BitSet();
- this.activeOutputs = new BitSet();
-
- this.inlets = new InPort[4];
- this.inputs = new long[4];
- this.rates = new float[4];
- this.outlets = new OutPort[4];
- }
-
- @Override
- public float getCapacity() {
- return capacity;
- }
-
- @Override
- public float getDemand() {
- return demand;
- }
-
- @Override
- public float getRate() {
- return rate;
- }
-
- @Override
- public int getMaxInputs() {
- return Integer.MAX_VALUE;
- }
-
- @Override
- public int getMaxOutputs() {
- return Integer.MAX_VALUE;
- }
-
- @Override
- public long onUpdate(FlowStage ctx, long now) {
- float capacity = this.capacity;
- float demand = this.demand;
- float rate = demand;
-
- if (demand > capacity) {
- rate = redistributeCapacity(inlets, inputs, rates, capacity);
- }
-
- if (this.rate != rate) {
- // Only update the outputs if the output rate has changed
- this.rate = rate;
-
- changeRate(activeOutputs, outlets, capacity, rate);
- }
-
- return Long.MAX_VALUE;
- }
-
- @Override
- public int getInputCount() {
- return activeInputs.length();
- }
-
- @Override
- public Inlet newInput() {
- final BitSet activeInputs = this.activeInputs;
- int slot = activeInputs.nextClearBit(0);
-
- InPort port = stage.getInlet("in" + slot);
- port.setHandler(inHandler);
- port.pull(this.capacity);
-
- InPort[] inlets = this.inlets;
- if (slot >= inlets.length) {
- int newLength = inlets.length + (inlets.length >> 1);
- inlets = Arrays.copyOf(inlets, newLength);
- inputs = Arrays.copyOf(inputs, newLength);
- rates = Arrays.copyOf(rates, newLength);
- this.inlets = inlets;
- }
- inlets[slot] = port;
-
- activeInputs.set(slot);
- return port;
- }
-
- @Override
- public void releaseInput(Inlet inlet) {
- InPort port = (InPort) inlet;
-
- activeInputs.clear(port.getId());
- port.cancel(null);
- }
-
- @Override
- public int getOutputCount() {
- return activeOutputs.length();
- }
-
- @Override
- public Outlet newOutput() {
- final BitSet activeOutputs = this.activeOutputs;
- int slot = activeOutputs.nextClearBit(0);
-
- OutPort port = stage.getOutlet("out" + slot);
- port.setHandler(outHandler);
-
- OutPort[] outlets = this.outlets;
- if (slot >= outlets.length) {
- int newLength = outlets.length + (outlets.length >> 1);
- outlets = Arrays.copyOf(outlets, newLength);
- this.outlets = outlets;
- }
- outlets[slot] = port;
-
- activeOutputs.set(slot);
- return port;
- }
-
- @Override
- public void releaseOutput(Outlet outlet) {
- OutPort port = (OutPort) outlet;
- activeInputs.clear(port.getId());
- port.complete();
- }
-
- /**
- * Helper function to redistribute the specified capacity across the inlets.
- */
- private static float redistributeCapacity(InPort[] inlets, long[] inputs, float[] rates, float capacity) {
- // If the demand is higher than the capacity, we need use max-min fair sharing to distribute the
- // constrained capacity across the inputs.
- for (int i = 0; i < inputs.length; i++) {
- InPort inlet = inlets[i];
- if (inlet == null) {
- break;
- }
-
- inputs[i] = ((long) Float.floatToRawIntBits(inlet.getDemand()) << 32) | (i & 0xFFFFFFFFL);
- }
- Arrays.sort(inputs);
-
- float availableCapacity = capacity;
- int inputSize = inputs.length;
-
- // Divide the available output capacity fairly over the inputs using max-min fair sharing
- for (int i = 0; i < inputs.length; i++) {
- long v = inputs[i];
- int slot = (int) v;
- float d = Float.intBitsToFloat((int) (v >> 32));
-
- if (d == 0.0) {
- continue;
- }
-
- float availableShare = availableCapacity / (inputSize - i);
- float r = Math.min(d, availableShare);
-
- rates[slot] = r;
- availableCapacity -= r;
- }
-
- return capacity - availableCapacity;
- }
-
- /**
- * Helper method to change the rate of the outlets.
- */
- private static void changeRate(BitSet activeOutputs, OutPort[] outlets, float capacity, float rate) {
- // Divide the requests over the available capacity of the input resources fairly
- for (int i = activeOutputs.nextSetBit(0); i != -1; i = activeOutputs.nextSetBit(i + 1)) {
- OutPort outlet = outlets[i];
- float fraction = outlet.getCapacity() / capacity;
- outlet.push(rate * fraction);
- }
- }
-
- /**
- * A {@link InHandler} implementation for the multiplexer inputs.
- */
- private class MultiplexerInHandler implements InHandler {
- @Override
- public float getRate(InPort port) {
- return rates[port.getId()];
- }
-
- @Override
- public void onPush(InPort port, float demand) {
- MaxMinFlowMultiplexer.this.demand += -port.getDemand() + demand;
- rates[port.getId()] = demand;
- }
-
- @Override
- public void onUpstreamFinish(InPort port, Throwable cause) {
- MaxMinFlowMultiplexer.this.demand -= port.getDemand();
- releaseInput(port);
- rates[port.getId()] = 0.f;
- }
- }
-
- /**
- * A {@link OutHandler} implementation for the multiplexer outputs.
- */
- private class MultiplexerOutHandler implements OutHandler {
- @Override
- public void onPull(OutPort port, float capacity) {
- float newCapacity = MaxMinFlowMultiplexer.this.capacity - port.getCapacity() + capacity;
- MaxMinFlowMultiplexer.this.capacity = newCapacity;
- changeInletCapacity(newCapacity);
- }
-
- @Override
- public void onDownstreamFinish(OutPort port, Throwable cause) {
- float newCapacity = MaxMinFlowMultiplexer.this.capacity - port.getCapacity();
- MaxMinFlowMultiplexer.this.capacity = newCapacity;
- releaseOutput(port);
- changeInletCapacity(newCapacity);
- }
-
- private void changeInletCapacity(float capacity) {
- BitSet activeInputs = MaxMinFlowMultiplexer.this.activeInputs;
- InPort[] inlets = MaxMinFlowMultiplexer.this.inlets;
-
- for (int i = activeInputs.nextSetBit(0); i != -1; i = activeInputs.nextSetBit(i + 1)) {
- inlets[i].pull(capacity);
- }
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/FlowSink.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/FlowSink.java
deleted file mode 100644
index 69c94708..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/FlowSink.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2.sink;
-
-import org.opendc.simulator.flow2.FlowStage;
-import org.opendc.simulator.flow2.Inlet;
-
-/**
- * A {@link FlowStage} with a single input.
- */
-public interface FlowSink {
- /**
- * Return the input of this {@link FlowSink}.
- */
- Inlet getInput();
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/SimpleFlowSink.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/SimpleFlowSink.java
deleted file mode 100644
index fdfe5ee8..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/SimpleFlowSink.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2.sink;
-
-import org.opendc.simulator.flow2.FlowGraph;
-import org.opendc.simulator.flow2.FlowStage;
-import org.opendc.simulator.flow2.FlowStageLogic;
-import org.opendc.simulator.flow2.InHandler;
-import org.opendc.simulator.flow2.InPort;
-import org.opendc.simulator.flow2.Inlet;
-
-/**
- * A sink with a fixed capacity.
- */
-public final class SimpleFlowSink implements FlowSink, FlowStageLogic {
- private final FlowStage stage;
- private final InPort input;
- private final Handler handler;
-
- /**
- * Construct a new {@link SimpleFlowSink} with the specified initial capacity.
- *
- * @param graph The graph to add the sink to.
- * @param initialCapacity The initial capacity of the sink.
- */
- public SimpleFlowSink(FlowGraph graph, float initialCapacity) {
- this.stage = graph.newStage(this);
- this.handler = new Handler();
- this.input = stage.getInlet("in");
- this.input.pull(initialCapacity);
- this.input.setMask(true);
- this.input.setHandler(handler);
- }
-
- /**
- * Return the {@link Inlet} of this sink.
- */
- @Override
- public Inlet getInput() {
- return input;
- }
-
- /**
- * Return the capacity of the sink.
- */
- public float getCapacity() {
- return input.getCapacity();
- }
-
- /**
- * Update the capacity of the sink.
- *
- * @param capacity The new capacity to update the sink to.
- */
- public void setCapacity(float capacity) {
- input.pull(capacity);
- stage.invalidate();
- }
-
- /**
- * Return the flow rate of the sink.
- */
- public float getRate() {
- return input.getRate();
- }
-
- /**
- * Remove this node from the graph.
- */
- public void close() {
- stage.close();
- }
-
- @Override
- public long onUpdate(FlowStage ctx, long now) {
- InPort input = this.input;
- handler.rate = Math.min(input.getDemand(), input.getCapacity());
- return Long.MAX_VALUE;
- }
-
- /**
- * The {@link InHandler} implementation for the sink.
- */
- private static final class Handler implements InHandler {
- float rate;
-
- @Override
- public float getRate(InPort port) {
- return rate;
- }
-
- @Override
- public void onPush(InPort port, float demand) {
- float capacity = port.getCapacity();
- rate = Math.min(demand, capacity);
- }
-
- @Override
- public void onUpstreamFinish(InPort port, Throwable cause) {
- rate = 0.f;
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/EmptyFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/EmptyFlowSource.java
deleted file mode 100644
index 2dcc66e4..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/EmptyFlowSource.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2.source;
-
-import org.opendc.simulator.flow2.FlowGraph;
-import org.opendc.simulator.flow2.FlowStage;
-import org.opendc.simulator.flow2.FlowStageLogic;
-import org.opendc.simulator.flow2.OutPort;
-import org.opendc.simulator.flow2.Outlet;
-
-/**
- * An empty {@link FlowSource}.
- */
-public final class EmptyFlowSource implements FlowSource, FlowStageLogic {
- private final FlowStage stage;
- private final OutPort output;
-
- /**
- * Construct a new {@link EmptyFlowSource}.
- */
- public EmptyFlowSource(FlowGraph graph) {
- this.stage = graph.newStage(this);
- this.output = stage.getOutlet("out");
- }
-
- /**
- * Return the {@link Outlet} of the source.
- */
- @Override
- public Outlet getOutput() {
- return output;
- }
-
- /**
- * Remove this node from the graph.
- */
- public void close() {
- stage.close();
- }
-
- @Override
- public long onUpdate(FlowStage ctx, long now) {
- return Long.MAX_VALUE;
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java
deleted file mode 100644
index c09987cd..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2.source;
-
-import java.util.function.Consumer;
-import org.opendc.simulator.flow2.FlowGraph;
-import org.opendc.simulator.flow2.FlowStage;
-import org.opendc.simulator.flow2.FlowStageLogic;
-import org.opendc.simulator.flow2.OutHandler;
-import org.opendc.simulator.flow2.OutPort;
-import org.opendc.simulator.flow2.Outlet;
-
-/**
- * A {@link FlowSource} that ensures a flow is emitted for a specified amount of time at some utilization.
- */
-public class RuntimeFlowSource implements FlowSource, FlowStageLogic {
- private final float utilization;
-
- private final FlowStage stage;
- private final OutPort output;
- private final Consumer<RuntimeFlowSource> completionHandler;
-
- private long duration;
- private long lastPull;
-
- /**
- * Construct a {@link RuntimeFlowSource} instance.
- *
- * @param graph The {@link FlowGraph} to which this source belongs.
- * @param duration The duration of the source.
- * @param utilization The utilization of the capacity of the outlet.
- * @param completionHandler A callback invoked when the source completes.
- */
- public RuntimeFlowSource(
- FlowGraph graph, long duration, float utilization, Consumer<RuntimeFlowSource> completionHandler) {
- if (duration <= 0) {
- throw new IllegalArgumentException("Duration must be positive and non-zero");
- }
-
- if (utilization <= 0.0) {
- throw new IllegalArgumentException("Utilization must be positive and non-zero");
- }
-
- this.stage = graph.newStage(this);
- this.output = stage.getOutlet("out");
- this.output.setHandler(new OutHandler() {
- @Override
- public void onPull(OutPort port, float capacity) {}
-
- @Override
- public void onDownstreamFinish(OutPort port, Throwable cause) {
- // Source cannot complete without re-connecting to another sink, so mark the source as completed
- completionHandler.accept(RuntimeFlowSource.this);
- }
- });
- this.duration = duration;
- this.utilization = utilization;
- this.completionHandler = completionHandler;
- this.lastPull = graph.getEngine().getClock().millis();
- }
-
- /**
- * Construct a new {@link RuntimeFlowSource}.
- *
- * @param graph The {@link FlowGraph} to which this source belongs.
- * @param duration The duration of the source.
- * @param utilization The utilization of the capacity of the outlet.
- */
- public RuntimeFlowSource(FlowGraph graph, long duration, float utilization) {
- this(graph, duration, utilization, RuntimeFlowSource::close);
- }
-
- /**
- * Return the {@link Outlet} of the source.
- */
- @Override
- public Outlet getOutput() {
- return output;
- }
-
- /**
- * Remove this node from the graph.
- */
- public void close() {
- stage.close();
- }
-
- @Override
- public long onUpdate(FlowStage ctx, long now) {
- long lastPull = this.lastPull;
- this.lastPull = now;
-
- long delta = Math.max(0, now - lastPull);
-
- OutPort output = this.output;
- float limit = output.getCapacity() * utilization;
- long duration = this.duration - delta;
-
- if (duration <= 0) {
- completionHandler.accept(this);
- return Long.MAX_VALUE;
- }
-
- this.duration = duration;
- output.push(limit);
- return now + duration;
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java
deleted file mode 100644
index a0e9cb9d..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2.source;
-
-import java.util.function.Consumer;
-import org.opendc.simulator.flow2.FlowGraph;
-import org.opendc.simulator.flow2.FlowStage;
-import org.opendc.simulator.flow2.FlowStageLogic;
-import org.opendc.simulator.flow2.OutHandler;
-import org.opendc.simulator.flow2.OutPort;
-import org.opendc.simulator.flow2.Outlet;
-
-/**
- * A flow source that contains a fixed amount and is pushed with a given utilization.
- */
-public final class SimpleFlowSource implements FlowSource, FlowStageLogic {
- private final float utilization;
- private float remainingAmount;
- private long lastPull;
-
- private final FlowStage stage;
- private final OutPort output;
- private final Consumer<SimpleFlowSource> completionHandler;
-
- /**
- * Construct a new {@link SimpleFlowSource}.
- *
- * @param graph The {@link FlowGraph} to which this source belongs.
- * @param amount The amount to transfer via the outlet.
- * @param utilization The utilization of the capacity of the outlet.
- * @param completionHandler A callback invoked when the source completes.
- */
- public SimpleFlowSource(
- FlowGraph graph, float amount, float utilization, Consumer<SimpleFlowSource> completionHandler) {
- if (amount < 0.0) {
- throw new IllegalArgumentException("Amount must be non-negative");
- }
-
- if (utilization <= 0.0) {
- throw new IllegalArgumentException("Utilization must be positive and non-zero");
- }
-
- this.stage = graph.newStage(this);
- this.output = stage.getOutlet("out");
- this.output.setHandler(new OutHandler() {
- @Override
- public void onPull(OutPort port, float capacity) {}
-
- @Override
- public void onDownstreamFinish(OutPort port, Throwable cause) {
- // Source cannot complete without re-connecting to another sink, so mark the source as completed
- completionHandler.accept(SimpleFlowSource.this);
- }
- });
- this.completionHandler = completionHandler;
- this.utilization = utilization;
- this.remainingAmount = amount;
- this.lastPull = graph.getEngine().getClock().millis();
- }
-
- /**
- * Construct a new {@link SimpleFlowSource}.
- *
- * @param graph The {@link FlowGraph} to which this source belongs.
- * @param amount The amount to transfer via the outlet.
- * @param utilization The utilization of the capacity of the outlet.
- */
- public SimpleFlowSource(FlowGraph graph, float amount, float utilization) {
- this(graph, amount, utilization, SimpleFlowSource::close);
- }
-
- /**
- * Return the {@link Outlet} of the source.
- */
- @Override
- public Outlet getOutput() {
- return output;
- }
-
- /**
- * Remove this node from the graph.
- */
- public void close() {
- stage.close();
- }
-
- @Override
- public long onUpdate(FlowStage ctx, long now) {
- long lastPull = this.lastPull;
- this.lastPull = now;
-
- long delta = Math.max(0, now - lastPull);
-
- OutPort output = this.output;
- float consumed = output.getRate() * delta / 1000.f;
- float limit = output.getCapacity() * utilization;
-
- float remainingAmount = this.remainingAmount - consumed;
- this.remainingAmount = remainingAmount;
-
- long duration = (long) Math.ceil(remainingAmount / limit * 1000);
-
- if (duration <= 0) {
- completionHandler.accept(this);
- return Long.MAX_VALUE;
- }
-
- output.push(limit);
- return now + duration;
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java
deleted file mode 100644
index e8abc2d7..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2.source;
-
-import java.util.function.Consumer;
-import org.opendc.simulator.flow2.FlowGraph;
-import org.opendc.simulator.flow2.FlowStage;
-import org.opendc.simulator.flow2.FlowStageLogic;
-import org.opendc.simulator.flow2.OutHandler;
-import org.opendc.simulator.flow2.OutPort;
-import org.opendc.simulator.flow2.Outlet;
-
-/**
- * A flow source that replays a sequence of fragments, each indicating the flow rate for some period of time.
- */
-public final class TraceFlowSource implements FlowSource, FlowStageLogic {
- private final OutPort output;
- private final long[] deadlines;
- private final float[] usages;
- private final int size;
- private int index;
-
- private final FlowStage stage;
- private final Consumer<TraceFlowSource> completionHandler;
-
- /**
- * Construct a {@link TraceFlowSource}.
- *
- * @param graph The {@link FlowGraph} to which the source belongs.
- * @param trace The {@link Trace} to replay.
- * @param completionHandler The completion handler to invoke when the source finishes.
- */
- public TraceFlowSource(FlowGraph graph, Trace trace, Consumer<TraceFlowSource> completionHandler) {
- this.stage = graph.newStage(this);
- this.output = stage.getOutlet("out");
- this.output.setHandler(new OutHandler() {
- @Override
- public void onPull(OutPort port, float capacity) {}
-
- @Override
- public void onDownstreamFinish(OutPort port, Throwable cause) {
- // Source cannot complete without re-connecting to another sink, so mark the source as completed
- completionHandler.accept(TraceFlowSource.this);
- }
- });
- this.deadlines = trace.deadlines;
- this.usages = trace.usages;
- this.size = trace.size;
- this.completionHandler = completionHandler;
- }
-
- /**
- * Construct a {@link TraceFlowSource}.
- *
- * @param graph The {@link FlowGraph} to which the source belongs.
- * @param trace The {@link Trace} to replay.
- */
- public TraceFlowSource(FlowGraph graph, Trace trace) {
- this(graph, trace, TraceFlowSource::close);
- }
-
- @Override
- public Outlet getOutput() {
- return output;
- }
-
- /**
- * Remove this node from the graph.
- */
- public void close() {
- stage.close();
- }
-
- @Override
- public long onUpdate(FlowStage ctx, long now) {
- int size = this.size;
- int index = this.index;
- long[] deadlines = this.deadlines;
- long deadline;
-
- do {
- deadline = deadlines[index];
- } while (deadline <= now && ++index < size);
-
- if (index >= size) {
- output.push(0.0f);
- completionHandler.accept(this);
- return Long.MAX_VALUE;
- }
-
- this.index = index;
- float usage = usages[index];
- output.push(usage);
-
- return deadline;
- }
-
- /**
- * A trace describes the workload over time.
- */
- public static final class Trace {
- private final long[] deadlines;
- private final float[] usages;
- private final int size;
-
- /**
- * Construct a {@link Trace}.
- *
- * @param deadlines The deadlines of the trace fragments.
- * @param usages The usages of the trace fragments.
- * @param size The size of the trace.
- */
- public Trace(long[] deadlines, float[] usages, int size) {
- this.deadlines = deadlines;
- this.usages = usages;
- this.size = size;
- }
-
- public long[] getDeadlines() {
- return deadlines;
- }
-
- public float[] getUsages() {
- return usages;
- }
-
- public int getSize() {
- return size;
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransform.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransform.java
deleted file mode 100644
index 51ea7df3..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransform.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2.util;
-
-import org.opendc.simulator.flow2.FlowGraph;
-
-/**
- * A {@link FlowTransform} describes a transformation between two components in a {@link FlowGraph} that might operate
- * at different units of flow.
- */
-public interface FlowTransform {
- /**
- * Apply the transform to the specified flow rate.
- */
- float apply(float value);
-
- /**
- * Apply the inverse of the transformation to the specified flow rate.
- */
- float applyInverse(float value);
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransformer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransformer.java
deleted file mode 100644
index 852240d8..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransformer.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2.util;
-
-import org.opendc.simulator.flow2.*;
-import org.opendc.simulator.flow2.sink.FlowSink;
-import org.opendc.simulator.flow2.source.FlowSource;
-
-/**
- * Helper class to transform flow from outlet to inlet.
- */
-public final class FlowTransformer implements FlowStageLogic, FlowSource, FlowSink {
- private final FlowStage stage;
- private final InPort input;
- private final OutPort output;
-
- /**
- * Construct a new {@link FlowTransformer}.
- */
- public FlowTransformer(FlowGraph graph, FlowTransform transform) {
- this.stage = graph.newStage(this);
- this.input = stage.getInlet("in");
- this.output = stage.getOutlet("out");
-
- this.input.setHandler(new ForwardInHandler(output, transform));
- this.input.setMask(true);
- this.output.setHandler(new ForwardOutHandler(input, transform));
- this.output.setMask(true);
- }
-
- /**
- * Return the {@link Outlet} of the transformer.
- */
- @Override
- public Outlet getOutput() {
- return output;
- }
-
- /**
- * Return the {@link Inlet} of the transformer.
- */
- @Override
- public Inlet getInput() {
- return input;
- }
-
- /**
- * Close the transformer.
- */
- void close() {
- stage.close();
- }
-
- @Override
- public long onUpdate(FlowStage ctx, long now) {
- return Long.MAX_VALUE;
- }
-
- private static class ForwardInHandler implements InHandler {
- private final OutPort output;
- private final FlowTransform transform;
-
- ForwardInHandler(OutPort output, FlowTransform transform) {
- this.output = output;
- this.transform = transform;
- }
-
- @Override
- public float getRate(InPort port) {
- return transform.applyInverse(output.getRate());
- }
-
- @Override
- public void onPush(InPort port, float demand) {
- float rate = transform.apply(demand);
- output.push(rate);
- }
-
- @Override
- public void onUpstreamFinish(InPort port, Throwable cause) {
- output.fail(cause);
- }
- }
-
- private static class ForwardOutHandler implements OutHandler {
- private final InPort input;
- private final FlowTransform transform;
-
- ForwardOutHandler(InPort input, FlowTransform transform) {
- this.input = input;
- this.transform = transform;
- }
-
- @Override
- public void onPull(OutPort port, float capacity) {
- input.pull(transform.applyInverse(capacity));
- }
-
- @Override
- public void onDownstreamFinish(OutPort port, Throwable cause) {
- input.cancel(cause);
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransforms.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransforms.java
deleted file mode 100644
index 428dbfca..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransforms.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2.util;
-
-/**
- * A collection of common {@link FlowTransform} implementations.
- */
-public class FlowTransforms {
- /**
- * Prevent construction of this class.
- */
- private FlowTransforms() {}
-
- /**
- * Return a {@link FlowTransform} that forwards the flow rate unmodified.
- */
- public static FlowTransform noop() {
- return NoopFlowTransform.INSTANCE;
- }
-
- /**
- * No-op implementation of a {@link FlowTransform}.
- */
- private static final class NoopFlowTransform implements FlowTransform {
- static final NoopFlowTransform INSTANCE = new NoopFlowTransform();
-
- @Override
- public float apply(float value) {
- return value;
- }
-
- @Override
- public float applyInverse(float value) {
- return value;
- }
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/InvocationStackTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/InvocationStackTest.kt
index 2250fe87..7744d7b2 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/InvocationStackTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/InvocationStackTest.kt
@@ -20,12 +20,11 @@
* SOFTWARE.
*/
-package org.opendc.simulator.flow2
-
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
+import org.opendc.simulator.engine.InvocationStack
/**
* Test suite for the [InvocationStack] class.
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt
deleted file mode 100644
index 413a5878..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2
-
-import io.mockk.mockk
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Assertions.assertNotEquals
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertThrows
-import org.opendc.simulator.flow2.mux.MaxMinFlowMultiplexer
-import org.opendc.simulator.flow2.sink.SimpleFlowSink
-import org.opendc.simulator.flow2.source.SimpleFlowSource
-import org.opendc.simulator.kotlin.runSimulation
-
-/**
- * Smoke tests for the Flow API.
- */
-class FlowEngineTest {
- @Test
- fun testSmoke() =
- runSimulation {
- val engine = FlowEngine.create(dispatcher)
- val graph = engine.newGraph()
-
- val multiplexer = MaxMinFlowMultiplexer(graph)
- val sink = SimpleFlowSink(graph, 2.0f)
-
- graph.connect(multiplexer.newOutput(), sink.input)
-
- val sourceA = SimpleFlowSource(graph, 2000.0f, 0.8f)
- val sourceB = SimpleFlowSource(graph, 2000.0f, 0.8f)
-
- graph.connect(sourceA.output, multiplexer.newInput())
- graph.connect(sourceB.output, multiplexer.newInput())
- }
-
- @Test
- fun testConnectInvalidInlet() =
- runSimulation {
- val engine = FlowEngine.create(dispatcher)
- val graph = engine.newGraph()
-
- val inlet = mockk<Inlet>()
- val source = SimpleFlowSource(graph, 2000.0f, 0.8f)
- assertThrows<IllegalArgumentException> { graph.connect(source.output, inlet) }
- }
-
- @Test
- fun testConnectInvalidOutlet() =
- runSimulation {
- val engine = FlowEngine.create(dispatcher)
- val graph = engine.newGraph()
-
- val outlet = mockk<Outlet>()
- val sink = SimpleFlowSink(graph, 2.0f)
- assertThrows<IllegalArgumentException> { graph.connect(outlet, sink.input) }
- }
-
- @Test
- fun testConnectInletBelongsToDifferentGraph() =
- runSimulation {
- val engine = FlowEngine.create(dispatcher)
- val graphA = engine.newGraph()
- val graphB = engine.newGraph()
-
- val sink = SimpleFlowSink(graphB, 2.0f)
- val source = SimpleFlowSource(graphA, 2000.0f, 0.8f)
-
- assertThrows<IllegalArgumentException> { graphA.connect(source.output, sink.input) }
- }
-
- @Test
- fun testConnectOutletBelongsToDifferentGraph() =
- runSimulation {
- val engine = FlowEngine.create(dispatcher)
- val graphA = engine.newGraph()
- val graphB = engine.newGraph()
-
- val sink = SimpleFlowSink(graphA, 2.0f)
- val source = SimpleFlowSource(graphB, 2000.0f, 0.8f)
-
- assertThrows<IllegalArgumentException> { graphA.connect(source.output, sink.input) }
- }
-
- @Test
- fun testConnectInletAlreadyConnected() =
- runSimulation {
- val engine = FlowEngine.create(dispatcher)
- val graph = engine.newGraph()
-
- val sink = SimpleFlowSink(graph, 2.0f)
- val sourceA = SimpleFlowSource(graph, 2000.0f, 0.8f)
- val sourceB = SimpleFlowSource(graph, 2000.0f, 0.8f)
-
- graph.connect(sourceA.output, sink.input)
- assertThrows<IllegalStateException> { graph.connect(sourceB.output, sink.input) }
- }
-
- @Test
- fun testConnectOutletAlreadyConnected() =
- runSimulation {
- val engine = FlowEngine.create(dispatcher)
- val graph = engine.newGraph()
-
- val sinkA = SimpleFlowSink(graph, 2.0f)
- val sinkB = SimpleFlowSink(graph, 2.0f)
- val source = SimpleFlowSource(graph, 2000.0f, 0.8f)
-
- graph.connect(source.output, sinkA.input)
- assertThrows<IllegalStateException> { graph.connect(source.output, sinkB.input) }
- }
-
- @Test
- fun testDisconnectInletInvalid() =
- runSimulation {
- val engine = FlowEngine.create(dispatcher)
- val graph = engine.newGraph()
-
- val inlet = mockk<Inlet>()
- assertThrows<IllegalArgumentException> { graph.disconnect(inlet) }
- }
-
- @Test
- fun testDisconnectOutletInvalid() =
- runSimulation {
- val engine = FlowEngine.create(dispatcher)
- val graph = engine.newGraph()
-
- val outlet = mockk<Outlet>()
- assertThrows<IllegalArgumentException> { graph.disconnect(outlet) }
- }
-
- @Test
- fun testDisconnectInletInvalidGraph() =
- runSimulation {
- val engine = FlowEngine.create(dispatcher)
- val graphA = engine.newGraph()
- val graphB = engine.newGraph()
-
- val sink = SimpleFlowSink(graphA, 2.0f)
-
- assertThrows<IllegalArgumentException> { graphB.disconnect(sink.input) }
- }
-
- @Test
- fun testDisconnectOutletInvalidGraph() =
- runSimulation {
- val engine = FlowEngine.create(dispatcher)
- val graphA = engine.newGraph()
- val graphB = engine.newGraph()
-
- val source = SimpleFlowSource(graphA, 2000.0f, 0.8f)
-
- assertThrows<IllegalArgumentException> { graphB.disconnect(source.output) }
- }
-
- @Test
- fun testInletEquality() =
- runSimulation {
- val engine = FlowEngine.create(dispatcher)
- val graph = engine.newGraph()
-
- val sinkA = SimpleFlowSink(graph, 2.0f)
- val sinkB = SimpleFlowSink(graph, 2.0f)
-
- val multiplexer = MaxMinFlowMultiplexer(graph)
-
- assertEquals(sinkA.input, sinkA.input)
- assertNotEquals(sinkA.input, sinkB.input)
-
- assertNotEquals(multiplexer.newInput(), multiplexer.newInput())
- }
-
- @Test
- fun testOutletEquality() =
- runSimulation {
- val engine = FlowEngine.create(dispatcher)
- val graph = engine.newGraph()
-
- val sourceA = SimpleFlowSource(graph, 2000.0f, 0.8f)
- val sourceB = SimpleFlowSource(graph, 2000.0f, 0.8f)
-
- val multiplexer = MaxMinFlowMultiplexer(graph)
-
- assertEquals(sourceA.output, sourceA.output)
- assertNotEquals(sourceA.output, sourceB.output)
-
- assertNotEquals(multiplexer.newOutput(), multiplexer.newOutput())
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowTimerQueueTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowTimerQueueTest.kt
deleted file mode 100644
index 059bd5f5..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowTimerQueueTest.kt
+++ /dev/null
@@ -1,385 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2
-
-import io.mockk.mockk
-import org.junit.jupiter.api.Assertions.assertAll
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Assertions.assertNull
-import org.junit.jupiter.api.BeforeEach
-import org.junit.jupiter.api.Test
-
-/**
- * Test suite for the [FlowTimerQueue] class.
- */
-class FlowTimerQueueTest {
- private lateinit var queue: FlowTimerQueue
-
- @BeforeEach
- fun setUp() {
- queue = FlowTimerQueue(3)
- }
-
- /**
- * Test whether a call to [FlowTimerQueue.poll] returns `null` for an empty queue.
- */
- @Test
- fun testPollEmpty() {
- assertAll(
- { assertEquals(Long.MAX_VALUE, queue.peekDeadline()) },
- { assertNull(queue.poll(100L)) },
- )
- }
-
- /**
- * Test whether a call to [FlowTimerQueue.poll] returns the proper value for a queue with a single entry.
- */
- @Test
- fun testSingleEntry() {
- val entry = mockk<FlowStage>()
- entry.deadline = 100
- entry.timerIndex = -1
-
- queue.enqueue(entry)
-
- assertAll(
- { assertEquals(100, queue.peekDeadline()) },
- { assertNull(queue.poll(10L)) },
- { assertEquals(entry, queue.poll(200L)) },
- { assertNull(queue.poll(200L)) },
- )
- }
-
- /**
- * Test whether [FlowTimerQueue.poll] returns values in the queue in the proper order.
- */
- @Test
- fun testMultipleEntries() {
- val entryA = mockk<FlowStage>()
- entryA.deadline = 100
- entryA.timerIndex = -1
-
- queue.enqueue(entryA)
-
- val entryB = mockk<FlowStage>()
- entryB.deadline = 10
- entryB.timerIndex = -1
-
- queue.enqueue(entryB)
-
- val entryC = mockk<FlowStage>()
- entryC.deadline = 58
- entryC.timerIndex = -1
-
- queue.enqueue(entryC)
-
- assertAll(
- { assertEquals(10, queue.peekDeadline()) },
- { assertEquals(entryB, queue.poll(100L)) },
- { assertEquals(entryC, queue.poll(100L)) },
- { assertEquals(entryA, queue.poll(100L)) },
- { assertNull(queue.poll(100L)) },
- )
- }
-
- /**
- * Test that the queue is properly resized when the number of entries exceed the capacity.
- */
- @Test
- fun testResize() {
- val entryA = mockk<FlowStage>()
- entryA.deadline = 100
- entryA.timerIndex = -1
-
- queue.enqueue(entryA)
-
- val entryB = mockk<FlowStage>()
- entryB.deadline = 20
- entryB.timerIndex = -1
-
- queue.enqueue(entryB)
-
- val entryC = mockk<FlowStage>()
- entryC.deadline = 58
- entryC.timerIndex = -1
-
- queue.enqueue(entryC)
-
- val entryD = mockk<FlowStage>()
- entryD.deadline = 31
- entryD.timerIndex = -1
-
- queue.enqueue(entryD)
-
- assertAll(
- { assertEquals(20, queue.peekDeadline()) },
- { assertEquals(entryB, queue.poll(100L)) },
- { assertEquals(entryD, queue.poll(100L)) },
- { assertEquals(entryC, queue.poll(100L)) },
- { assertEquals(entryA, queue.poll(100L)) },
- { assertNull(queue.poll(100L)) },
- )
- }
-
- /**
- * Test to verify that we can change the deadline of the last element in the queue.
- */
- @Test
- fun testChangeDeadlineTail() {
- val entryA = mockk<FlowStage>()
- entryA.deadline = 100
- entryA.timerIndex = -1
-
- queue.enqueue(entryA)
-
- val entryB = mockk<FlowStage>()
- entryB.deadline = 20
- entryB.timerIndex = -1
-
- queue.enqueue(entryB)
-
- val entryC = mockk<FlowStage>()
- entryC.deadline = 58
- entryC.timerIndex = -1
-
- queue.enqueue(entryC)
-
- entryA.deadline = 10
- queue.enqueue(entryA)
-
- assertAll(
- { assertEquals(10, queue.peekDeadline()) },
- { assertEquals(entryA, queue.poll(100L)) },
- { assertEquals(entryB, queue.poll(100L)) },
- { assertEquals(entryC, queue.poll(100L)) },
- { assertNull(queue.poll(100L)) },
- )
- }
-
- /**
- * Test that we can change the deadline of the head entry in the queue.
- */
- @Test
- fun testChangeDeadlineMiddle() {
- val entryA = mockk<FlowStage>()
- entryA.deadline = 100
- entryA.timerIndex = -1
-
- queue.enqueue(entryA)
-
- val entryB = mockk<FlowStage>()
- entryB.deadline = 20
- entryB.timerIndex = -1
-
- queue.enqueue(entryB)
-
- val entryC = mockk<FlowStage>()
- entryC.deadline = 58
- entryC.timerIndex = -1
-
- queue.enqueue(entryC)
-
- entryC.deadline = 10
- queue.enqueue(entryC)
-
- assertAll(
- { assertEquals(10, queue.peekDeadline()) },
- { assertEquals(entryC, queue.poll(100L)) },
- { assertEquals(entryB, queue.poll(100L)) },
- { assertEquals(entryA, queue.poll(100L)) },
- { assertNull(queue.poll(100L)) },
- )
- }
-
- /**
- * Test that we can change the deadline of the head entry in the queue.
- */
- @Test
- fun testChangeDeadlineHead() {
- val entryA = mockk<FlowStage>()
- entryA.deadline = 100
- entryA.timerIndex = -1
-
- queue.enqueue(entryA)
-
- val entryB = mockk<FlowStage>()
- entryB.deadline = 20
- entryB.timerIndex = -1
-
- queue.enqueue(entryB)
-
- val entryC = mockk<FlowStage>()
- entryC.deadline = 58
- entryC.timerIndex = -1
-
- queue.enqueue(entryC)
-
- entryB.deadline = 30
- queue.enqueue(entryB)
-
- assertAll(
- { assertEquals(30, queue.peekDeadline()) },
- { assertEquals(entryB, queue.poll(100L)) },
- { assertEquals(entryC, queue.poll(100L)) },
- { assertEquals(entryA, queue.poll(100L)) },
- { assertNull(queue.poll(100L)) },
- )
- }
-
- /**
- * Test that an unchanged deadline results in a no-op.
- */
- @Test
- fun testChangeDeadlineNop() {
- val entryA = mockk<FlowStage>()
- entryA.deadline = 100
- entryA.timerIndex = -1
-
- queue.enqueue(entryA)
-
- val entryB = mockk<FlowStage>()
- entryB.deadline = 20
- entryB.timerIndex = -1
-
- queue.enqueue(entryB)
-
- val entryC = mockk<FlowStage>()
- entryC.deadline = 58
- entryC.timerIndex = -1
-
- queue.enqueue(entryC)
-
- // Should be a no-op
- queue.enqueue(entryA)
-
- assertAll(
- { assertEquals(20, queue.peekDeadline()) },
- { assertEquals(entryB, queue.poll(100L)) },
- { assertEquals(entryC, queue.poll(100L)) },
- { assertEquals(entryA, queue.poll(100L)) },
- { assertNull(queue.poll(100L)) },
- )
- }
-
- /**
- * Test that we can remove an entry from the end of the queue.
- */
- @Test
- fun testRemoveEntryTail() {
- val entryA = mockk<FlowStage>()
- entryA.deadline = 100
- entryA.timerIndex = -1
-
- queue.enqueue(entryA)
-
- val entryB = mockk<FlowStage>()
- entryB.deadline = 20
- entryB.timerIndex = -1
-
- queue.enqueue(entryB)
-
- val entryC = mockk<FlowStage>()
- entryC.deadline = 58
- entryC.timerIndex = -1
-
- queue.enqueue(entryC)
-
- entryC.deadline = Long.MAX_VALUE
- queue.enqueue(entryC)
-
- assertAll(
- { assertEquals(20, queue.peekDeadline()) },
- { assertEquals(entryB, queue.poll(100L)) },
- { assertEquals(entryA, queue.poll(100L)) },
- { assertNull(queue.poll(100L)) },
- )
- }
-
- /**
- * Test that we can remove an entry from the head of the queue.
- */
- @Test
- fun testRemoveEntryHead() {
- val entryA = mockk<FlowStage>()
- entryA.deadline = 100
- entryA.timerIndex = -1
-
- queue.enqueue(entryA)
-
- val entryB = mockk<FlowStage>()
- entryB.deadline = 20
- entryB.timerIndex = -1
-
- queue.enqueue(entryB)
-
- val entryC = mockk<FlowStage>()
- entryC.deadline = 58
- entryC.timerIndex = -1
-
- queue.enqueue(entryC)
-
- entryB.deadline = Long.MAX_VALUE
- queue.enqueue(entryB)
-
- assertAll(
- { assertEquals(58, queue.peekDeadline()) },
- { assertEquals(entryC, queue.poll(100L)) },
- { assertEquals(entryA, queue.poll(100L)) },
- { assertNull(queue.poll(100L)) },
- )
- }
-
- /**
- * Test that we can remove an entry from the middle of a queue.
- */
- @Test
- fun testRemoveEntryMiddle() {
- val entryA = mockk<FlowStage>()
- entryA.deadline = 100
- entryA.timerIndex = -1
-
- queue.enqueue(entryA)
-
- val entryB = mockk<FlowStage>()
- entryB.deadline = 20
- entryB.timerIndex = -1
-
- queue.enqueue(entryB)
-
- val entryC = mockk<FlowStage>()
- entryC.deadline = 58
- entryC.timerIndex = -1
-
- queue.enqueue(entryC)
-
- entryC.deadline = Long.MAX_VALUE
- queue.enqueue(entryC)
-
- assertAll(
- { assertEquals(20, queue.peekDeadline()) },
- { assertEquals(entryB, queue.poll(100L)) },
- { assertEquals(entryA, queue.poll(100L)) },
- { assertNull(queue.poll(100L)) },
- )
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt
deleted file mode 100644
index 2aef5174..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2.mux
-
-import kotlinx.coroutines.yield
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertAll
-import org.opendc.simulator.flow2.FlowEngine
-import org.opendc.simulator.flow2.sink.SimpleFlowSink
-import org.opendc.simulator.flow2.source.TraceFlowSource
-import org.opendc.simulator.kotlin.runSimulation
-
-/**
- * Test suite for the [ForwardingFlowMultiplexer] class.
- */
-class ForwardingFlowMultiplexerTest {
- /**
- * Test a trace workload.
- */
- @Test
- fun testTrace() =
- runSimulation {
- val engine = FlowEngine.create(dispatcher)
- val graph = engine.newGraph()
-
- val switch = ForwardingFlowMultiplexer(graph)
- val sink = SimpleFlowSink(graph, 3200.0f)
- graph.connect(switch.newOutput(), sink.input)
-
- yield()
-
- assertEquals(sink.capacity, switch.capacity) { "Capacity is not detected" }
-
- val workload =
- TraceFlowSource(
- graph,
- TraceFlowSource.Trace(
- longArrayOf(1000, 2000, 3000, 4000),
- floatArrayOf(28.0f, 3500.0f, 0.0f, 183.0f),
- 4,
- ),
- )
- graph.connect(workload.output, switch.newInput())
-
- advanceUntilIdle()
-
- assertAll(
- { assertEquals(4000, timeSource.millis()) { "Took enough time" } },
- )
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt
deleted file mode 100644
index 0bcf4a3f..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2.mux
-
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-import org.opendc.simulator.flow2.FlowEngine
-import org.opendc.simulator.flow2.sink.SimpleFlowSink
-import org.opendc.simulator.flow2.source.SimpleFlowSource
-import org.opendc.simulator.kotlin.runSimulation
-
-/**
- * Test suite for the [MaxMinFlowMultiplexer] class.
- */
-class MaxMinFlowMultiplexerTest {
- @Test
- fun testSmoke() =
- runSimulation {
- val engine = FlowEngine.create(dispatcher)
- val graph = engine.newGraph()
- val switch = MaxMinFlowMultiplexer(graph)
-
- val sinks = List(2) { SimpleFlowSink(graph, 2000.0f) }
- for (source in sinks) {
- graph.connect(switch.newOutput(), source.input)
- }
-
- val source = SimpleFlowSource(graph, 2000.0f, 1.0f)
- graph.connect(source.output, switch.newInput())
-
- advanceUntilIdle()
-
- assertEquals(500, timeSource.millis())
- }
-}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt
deleted file mode 100644
index 7085a4b9..00000000
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Copyright (c) 2022 AtLarge Research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package org.opendc.simulator.flow2.sink
-
-import kotlinx.coroutines.delay
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-import org.opendc.simulator.flow2.FlowEngine
-import org.opendc.simulator.flow2.source.SimpleFlowSource
-import org.opendc.simulator.flow2.source.TraceFlowSource
-import org.opendc.simulator.kotlin.runSimulation
-import java.util.concurrent.ThreadLocalRandom
-
-/**
- * Test suite for the [SimpleFlowSink] class.
- */
-class FlowSinkTest {
- @Test
- fun testSmoke() =
- runSimulation {
- val engine = FlowEngine.create(dispatcher)
- val graph = engine.newGraph()
-
- val sink = SimpleFlowSink(graph, 1.0f)
- val source = SimpleFlowSource(graph, 2.0f, 1.0f)
-
- graph.connect(source.output, sink.input)
- advanceUntilIdle()
-
- assertEquals(2000, timeSource.millis())
- }
-
- @Test
- fun testAdjustCapacity() =
- runSimulation {
- val engine = FlowEngine.create(dispatcher)
- val graph = engine.newGraph()
-
- val sink = SimpleFlowSink(graph, 1.0f)
- val source = SimpleFlowSource(graph, 2.0f, 1.0f)
-
- graph.connect(source.output, sink.input)
-
- delay(1000)
- sink.capacity = 0.5f
-
- advanceUntilIdle()
-
- assertEquals(3000, timeSource.millis())
- }
-
- @Test
- fun testUtilization() =
- runSimulation {
- val engine = FlowEngine.create(dispatcher)
- val graph = engine.newGraph()
-
- val sink = SimpleFlowSink(graph, 1.0f)
- val source = SimpleFlowSource(graph, 2.0f, 0.5f)
-
- graph.connect(source.output, sink.input)
- advanceUntilIdle()
-
- assertEquals(4000, timeSource.millis())
- }
-
- @Test
- fun testFragments() =
- runSimulation {
- val engine = FlowEngine.create(dispatcher)
- val graph = engine.newGraph()
-
- val sink = SimpleFlowSink(graph, 1.0f)
- val trace =
- TraceFlowSource.Trace(
- longArrayOf(1000, 2000, 3000, 4000),
- floatArrayOf(1.0f, 0.5f, 2.0f, 1.0f),
- 4,
- )
- val source =
- TraceFlowSource(
- graph,
- trace,
- )
-
- graph.connect(source.output, sink.input)
- advanceUntilIdle()
-
- assertEquals(4000, timeSource.millis())
- }
-
- @Test
- fun benchmarkSink() {
- val random = ThreadLocalRandom.current()
- val traceSize = 10000000
- val trace =
- TraceFlowSource.Trace(
- LongArray(traceSize) { it * 1000L },
- FloatArray(traceSize) { random.nextDouble(0.0, 4500.0).toFloat() },
- traceSize,
- )
-
- return runSimulation {
- val engine = FlowEngine.create(dispatcher)
- val graph = engine.newGraph()
- val sink = SimpleFlowSink(graph, 4200.0f)
- val source = TraceFlowSource(graph, trace)
- graph.connect(source.output, sink.input)
- }
- }
-}