diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-09-01 17:10:52 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-10-21 22:13:04 +0200 |
| commit | 66a5266c2e6f060a11fffef1e9eed9e716056853 (patch) | |
| tree | f52b0dce630a35185aa5c599da38a2ab44a5f056 /opendc-simulator/opendc-simulator-flow/src/main/java | |
| parent | e4f4e1c4ebd02278dc1c24ee481357989af6abe5 (diff) | |
feat(sim/flow): Add forwarding flow multiplexer
This change implements a new `FlowMultiplexer` that forwards the inputs
directly to one of the pre-allocated outputs.
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src/main/java')
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java | 233 |
1 files changed, 233 insertions, 0 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java new file mode 100644 index 00000000..9ff6a4c8 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java @@ -0,0 +1,233 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.mux; + +import java.util.Arrays; +import java.util.BitSet; +import org.opendc.simulator.flow2.FlowGraph; +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.FlowStageLogic; +import org.opendc.simulator.flow2.InHandler; +import org.opendc.simulator.flow2.InPort; +import org.opendc.simulator.flow2.Inlet; +import org.opendc.simulator.flow2.OutHandler; +import org.opendc.simulator.flow2.OutPort; +import org.opendc.simulator.flow2.Outlet; + +/** + * A {@link FlowMultiplexer} implementation that allocates inputs to the outputs of the multiplexer exclusively. + * This means that a single input is directly connected to an output and that the multiplexer can only support as many + * inputs as outputs. + */ +public final class ForwardingFlowMultiplexer implements FlowMultiplexer, FlowStageLogic { + public final IdleInHandler IDLE_IN_HANDLER = new IdleInHandler(); + public final IdleOutHandler IDLE_OUT_HANDLER = new IdleOutHandler(); + + private final FlowStage stage; + + private InPort[] inlets; + private OutPort[] outlets; + private final BitSet activeInputs; + private final BitSet activeOutputs; + private final BitSet availableOutputs; + + public ForwardingFlowMultiplexer(FlowGraph graph) { + this.stage = graph.newStage(this); + + this.inlets = new InPort[4]; + this.activeInputs = new BitSet(); + this.outlets = new OutPort[4]; + this.activeOutputs = new BitSet(); + this.availableOutputs = new BitSet(); + } + + @Override + public int getInputCount() { + return activeInputs.length(); + } + + @Override + public Inlet newInput() { + final BitSet activeInputs = this.activeInputs; + int slot = activeInputs.nextClearBit(0); + + InPort inPort = stage.getInlet("in" + slot); + inPort.setMask(true); + + InPort[] inlets = this.inlets; + if (slot >= inlets.length) { + int newLength = inlets.length + (inlets.length >> 1); + inlets = Arrays.copyOf(inlets, newLength); + this.inlets = inlets; + } + + final BitSet availableOutputs = this.availableOutputs; + int outSlot = availableOutputs.nextSetBit(0); + + if (outSlot < 0) { + throw new IllegalStateException("No capacity available for a new input"); + } + + inlets[slot] = inPort; + activeInputs.set(slot); + + OutPort outPort = outlets[outSlot]; + availableOutputs.clear(outSlot); + + inPort.setHandler(new ForwardingInHandler(outPort)); + outPort.setHandler(new ForwardingOutHandler(inPort)); + + inPort.pull(outPort.getCapacity()); + + return inPort; + } + + @Override + public void releaseInput(Inlet inlet) { + InPort port = (InPort) inlet; + int slot = port.getId(); + + final BitSet activeInputs = this.activeInputs; + + if (!activeInputs.get(slot)) { + return; + } + + port.cancel(null); + activeInputs.clear(slot); + + ForwardingInHandler inHandler = (ForwardingInHandler) port.getHandler(); + availableOutputs.set(inHandler.output.getId()); + + port.setHandler(IDLE_IN_HANDLER); + } + + @Override + public int getOutputCount() { + return activeOutputs.length(); + } + + @Override + public Outlet newOutput() { + final BitSet activeOutputs = this.activeOutputs; + int slot = activeOutputs.nextClearBit(0); + + OutPort port = stage.getOutlet("out" + slot); + OutPort[] outlets = this.outlets; + if (slot >= outlets.length) { + int newLength = outlets.length + (outlets.length >> 1); + outlets = Arrays.copyOf(outlets, newLength); + this.outlets = outlets; + } + outlets[slot] = port; + + activeOutputs.set(slot); + availableOutputs.set(slot); + return port; + } + + @Override + public void releaseOutput(Outlet outlet) { + OutPort port = (OutPort) outlet; + int slot = port.getId(); + activeInputs.clear(slot); + availableOutputs.clear(slot); + port.complete(); + + port.setHandler(IDLE_OUT_HANDLER); + } + + @Override + public long onUpdate(FlowStage ctx, long now) { + return Long.MAX_VALUE; + } + + class ForwardingInHandler implements InHandler { + final OutPort output; + + ForwardingInHandler(OutPort output) { + this.output = output; + } + + @Override + public float getRate(InPort port) { + return output.getRate(); + } + + @Override + public void onPush(InPort port, float rate) { + output.push(rate); + } + + @Override + public void onUpstreamFinish(InPort port, Throwable cause) { + final OutPort output = this.output; + output.push(0.f); + + releaseInput(port); + } + } + + private class ForwardingOutHandler implements OutHandler { + private final InPort input; + + ForwardingOutHandler(InPort input) { + this.input = input; + } + + @Override + public void onPull(OutPort port, float capacity) { + input.pull(capacity); + } + + @Override + public void onDownstreamFinish(OutPort port, Throwable cause) { + input.cancel(cause); + + releaseOutput(port); + } + } + + private static class IdleInHandler implements InHandler { + @Override + public float getRate(InPort port) { + return 0.f; + } + + @Override + public void onPush(InPort port, float rate) { + port.cancel(new IllegalStateException("Inlet is not allocated")); + } + + @Override + public void onUpstreamFinish(InPort port, Throwable cause) {} + } + + private static class IdleOutHandler implements OutHandler { + @Override + public void onPull(OutPort port, float capacity) {} + + @Override + public void onDownstreamFinish(OutPort port, Throwable cause) {} + } +} |
