summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-09-01 17:10:52 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-10-21 22:13:04 +0200
commit66a5266c2e6f060a11fffef1e9eed9e716056853 (patch)
treef52b0dce630a35185aa5c599da38a2ab44a5f056
parente4f4e1c4ebd02278dc1c24ee481357989af6abe5 (diff)
feat(sim/flow): Add forwarding flow multiplexer
This change implements a new `FlowMultiplexer` that forwards the inputs directly to one of the pre-allocated outputs.
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java233
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt66
2 files changed, 299 insertions, 0 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java
new file mode 100644
index 00000000..9ff6a4c8
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java
@@ -0,0 +1,233 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.mux;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import org.opendc.simulator.flow2.FlowGraph;
+import org.opendc.simulator.flow2.FlowStage;
+import org.opendc.simulator.flow2.FlowStageLogic;
+import org.opendc.simulator.flow2.InHandler;
+import org.opendc.simulator.flow2.InPort;
+import org.opendc.simulator.flow2.Inlet;
+import org.opendc.simulator.flow2.OutHandler;
+import org.opendc.simulator.flow2.OutPort;
+import org.opendc.simulator.flow2.Outlet;
+
+/**
+ * A {@link FlowMultiplexer} implementation that allocates inputs to the outputs of the multiplexer exclusively.
+ * This means that a single input is directly connected to an output and that the multiplexer can only support as many
+ * inputs as outputs.
+ */
+public final class ForwardingFlowMultiplexer implements FlowMultiplexer, FlowStageLogic {
+ public final IdleInHandler IDLE_IN_HANDLER = new IdleInHandler();
+ public final IdleOutHandler IDLE_OUT_HANDLER = new IdleOutHandler();
+
+ private final FlowStage stage;
+
+ private InPort[] inlets;
+ private OutPort[] outlets;
+ private final BitSet activeInputs;
+ private final BitSet activeOutputs;
+ private final BitSet availableOutputs;
+
+ public ForwardingFlowMultiplexer(FlowGraph graph) {
+ this.stage = graph.newStage(this);
+
+ this.inlets = new InPort[4];
+ this.activeInputs = new BitSet();
+ this.outlets = new OutPort[4];
+ this.activeOutputs = new BitSet();
+ this.availableOutputs = new BitSet();
+ }
+
+ @Override
+ public int getInputCount() {
+ return activeInputs.length();
+ }
+
+ @Override
+ public Inlet newInput() {
+ final BitSet activeInputs = this.activeInputs;
+ int slot = activeInputs.nextClearBit(0);
+
+ InPort inPort = stage.getInlet("in" + slot);
+ inPort.setMask(true);
+
+ InPort[] inlets = this.inlets;
+ if (slot >= inlets.length) {
+ int newLength = inlets.length + (inlets.length >> 1);
+ inlets = Arrays.copyOf(inlets, newLength);
+ this.inlets = inlets;
+ }
+
+ final BitSet availableOutputs = this.availableOutputs;
+ int outSlot = availableOutputs.nextSetBit(0);
+
+ if (outSlot < 0) {
+ throw new IllegalStateException("No capacity available for a new input");
+ }
+
+ inlets[slot] = inPort;
+ activeInputs.set(slot);
+
+ OutPort outPort = outlets[outSlot];
+ availableOutputs.clear(outSlot);
+
+ inPort.setHandler(new ForwardingInHandler(outPort));
+ outPort.setHandler(new ForwardingOutHandler(inPort));
+
+ inPort.pull(outPort.getCapacity());
+
+ return inPort;
+ }
+
+ @Override
+ public void releaseInput(Inlet inlet) {
+ InPort port = (InPort) inlet;
+ int slot = port.getId();
+
+ final BitSet activeInputs = this.activeInputs;
+
+ if (!activeInputs.get(slot)) {
+ return;
+ }
+
+ port.cancel(null);
+ activeInputs.clear(slot);
+
+ ForwardingInHandler inHandler = (ForwardingInHandler) port.getHandler();
+ availableOutputs.set(inHandler.output.getId());
+
+ port.setHandler(IDLE_IN_HANDLER);
+ }
+
+ @Override
+ public int getOutputCount() {
+ return activeOutputs.length();
+ }
+
+ @Override
+ public Outlet newOutput() {
+ final BitSet activeOutputs = this.activeOutputs;
+ int slot = activeOutputs.nextClearBit(0);
+
+ OutPort port = stage.getOutlet("out" + slot);
+ OutPort[] outlets = this.outlets;
+ if (slot >= outlets.length) {
+ int newLength = outlets.length + (outlets.length >> 1);
+ outlets = Arrays.copyOf(outlets, newLength);
+ this.outlets = outlets;
+ }
+ outlets[slot] = port;
+
+ activeOutputs.set(slot);
+ availableOutputs.set(slot);
+ return port;
+ }
+
+ @Override
+ public void releaseOutput(Outlet outlet) {
+ OutPort port = (OutPort) outlet;
+ int slot = port.getId();
+ activeInputs.clear(slot);
+ availableOutputs.clear(slot);
+ port.complete();
+
+ port.setHandler(IDLE_OUT_HANDLER);
+ }
+
+ @Override
+ public long onUpdate(FlowStage ctx, long now) {
+ return Long.MAX_VALUE;
+ }
+
+ class ForwardingInHandler implements InHandler {
+ final OutPort output;
+
+ ForwardingInHandler(OutPort output) {
+ this.output = output;
+ }
+
+ @Override
+ public float getRate(InPort port) {
+ return output.getRate();
+ }
+
+ @Override
+ public void onPush(InPort port, float rate) {
+ output.push(rate);
+ }
+
+ @Override
+ public void onUpstreamFinish(InPort port, Throwable cause) {
+ final OutPort output = this.output;
+ output.push(0.f);
+
+ releaseInput(port);
+ }
+ }
+
+ private class ForwardingOutHandler implements OutHandler {
+ private final InPort input;
+
+ ForwardingOutHandler(InPort input) {
+ this.input = input;
+ }
+
+ @Override
+ public void onPull(OutPort port, float capacity) {
+ input.pull(capacity);
+ }
+
+ @Override
+ public void onDownstreamFinish(OutPort port, Throwable cause) {
+ input.cancel(cause);
+
+ releaseOutput(port);
+ }
+ }
+
+ private static class IdleInHandler implements InHandler {
+ @Override
+ public float getRate(InPort port) {
+ return 0.f;
+ }
+
+ @Override
+ public void onPush(InPort port, float rate) {
+ port.cancel(new IllegalStateException("Inlet is not allocated"));
+ }
+
+ @Override
+ public void onUpstreamFinish(InPort port, Throwable cause) {}
+ }
+
+ private static class IdleOutHandler implements OutHandler {
+ @Override
+ public void onPull(OutPort port, float capacity) {}
+
+ @Override
+ public void onDownstreamFinish(OutPort port, Throwable cause) {}
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt
new file mode 100644
index 00000000..a2ed2195
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2022 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow2.mux
+
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+import org.opendc.simulator.flow2.FlowEngine
+import org.opendc.simulator.flow2.sink.SimpleFlowSink
+import org.opendc.simulator.flow2.source.TraceFlowSource
+import org.opendc.simulator.kotlin.runSimulation
+
+/**
+ * Test suite for the [ForwardingFlowMultiplexer] class.
+ */
+class ForwardingFlowMultiplexerTest {
+ /**
+ * Test a trace workload.
+ */
+ @Test
+ fun testTrace() = runSimulation {
+ val engine = FlowEngine.create(coroutineContext, clock)
+ val graph = engine.newGraph()
+
+ val switch = ForwardingFlowMultiplexer(graph)
+ val sink = SimpleFlowSink(graph, 3200.0f)
+ graph.connect(switch.newOutput(), sink.input)
+
+ val workload =
+ TraceFlowSource(
+ graph,
+ TraceFlowSource.Trace(
+ longArrayOf(1000, 2000, 3000, 4000),
+ floatArrayOf(28.0f, 3500.0f, 0.0f, 183.0f),
+ 4
+ )
+ )
+ graph.connect(workload.output, switch.newInput())
+
+ advanceUntilIdle()
+
+ assertAll(
+ { assertEquals(4000, clock.millis()) { "Took enough time" } }
+ )
+ }
+}