From 44215bd668c5fa3efe2f57fc577824478b00af57 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 1 Sep 2022 14:38:34 +0200 Subject: refactor(sim/compute): Re-implement using flow2 This change re-implements the OpenDC compute simulator framework using the new flow2 framework for modelling multi-edge flow networks. The re-implementation is written in Java and focusses on performance and clean API surface. --- .../simulator/flow2/mux/FlowMultiplexer.java | 10 +++++ .../flow2/mux/FlowMultiplexerFactory.java | 51 ++++++++++++++++++++++ .../flow2/mux/ForwardingFlowMultiplexer.java | 15 +++++++ .../simulator/flow2/mux/MaxMinFlowMultiplexer.java | 20 +++++++-- .../simulator/flow2/source/RuntimeFlowSource.java | 5 +-- .../simulator/flow2/source/SimpleFlowSource.java | 5 +-- .../simulator/flow2/source/TraceFlowSource.java | 3 +- 7 files changed, 98 insertions(+), 11 deletions(-) create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexerFactory.java (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java index 2a23b039..dec98955 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java @@ -30,6 +30,16 @@ import org.opendc.simulator.flow2.Outlet; * A {@link FlowStageLogic} that multiplexes multiple inputs over (possibly) multiple outputs. */ public interface FlowMultiplexer { + /** + * Return maximum number of inputs supported by the multiplexer. + */ + int getMaxInputs(); + + /** + * Return maximum number of outputs supported by the multiplexer. + */ + int getMaxOutputs(); + /** * Return the number of active inputs on this multiplexer. */ diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexerFactory.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexerFactory.java new file mode 100644 index 00000000..0b5b9141 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexerFactory.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.mux; + +import org.opendc.simulator.flow2.FlowGraph; + +/** + * Factory interface for a {@link FlowMultiplexer} implementation. + */ +public interface FlowMultiplexerFactory { + /** + * Construct a new {@link FlowMultiplexer} belonging to the specified {@link FlowGraph}. + * + * @param graph The graph to which the multiplexer belongs. + */ + FlowMultiplexer newMultiplexer(FlowGraph graph); + + /** + * Return a {@link FlowMultiplexerFactory} for {@link ForwardingFlowMultiplexer} instances. + */ + static FlowMultiplexerFactory forwardingMultiplexer() { + return ForwardingFlowMultiplexer.FACTORY; + } + + /** + * Return a {@link FlowMultiplexerFactory} for {@link MaxMinFlowMultiplexer} instances. + */ + static FlowMultiplexerFactory maxMinMultiplexer() { + return MaxMinFlowMultiplexer.FACTORY; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java index 6394c3fd..abe3510b 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java @@ -40,6 +40,11 @@ import org.opendc.simulator.flow2.Outlet; * inputs as outputs. */ public final class ForwardingFlowMultiplexer implements FlowMultiplexer, FlowStageLogic { + /** + * Factory implementation for this implementation. + */ + static FlowMultiplexerFactory FACTORY = ForwardingFlowMultiplexer::new; + public final IdleInHandler IDLE_IN_HANDLER = new IdleInHandler(); public final IdleOutHandler IDLE_OUT_HANDLER = new IdleOutHandler(); @@ -85,6 +90,16 @@ public final class ForwardingFlowMultiplexer implements FlowMultiplexer, FlowSta return rate; } + @Override + public int getMaxInputs() { + return getOutputCount(); + } + + @Override + public int getMaxOutputs() { + return Integer.MAX_VALUE; + } + @Override public int getInputCount() { return activeInputs.length(); diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java index 77922066..ac5c4f5c 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java @@ -22,6 +22,8 @@ package org.opendc.simulator.flow2.mux; +import java.util.Arrays; +import java.util.BitSet; import org.opendc.simulator.flow2.FlowGraph; import org.opendc.simulator.flow2.FlowStage; import org.opendc.simulator.flow2.FlowStageLogic; @@ -32,9 +34,6 @@ import org.opendc.simulator.flow2.OutHandler; import org.opendc.simulator.flow2.OutPort; import org.opendc.simulator.flow2.Outlet; -import java.util.Arrays; -import java.util.BitSet; - /** * A {@link FlowMultiplexer} implementation that distributes the available capacity of the outputs over the inputs * using max-min fair sharing. @@ -43,6 +42,11 @@ import java.util.BitSet; * output capacity, but allows individual inputs to use more capacity if there is still capacity left. */ public final class MaxMinFlowMultiplexer implements FlowMultiplexer, FlowStageLogic { + /** + * Factory implementation for this implementation. + */ + static FlowMultiplexerFactory FACTORY = MaxMinFlowMultiplexer::new; + private final FlowStage stage; private final BitSet activeInputs; private final BitSet activeOutputs; @@ -90,6 +94,16 @@ public final class MaxMinFlowMultiplexer implements FlowMultiplexer, FlowStageLo return rate; } + @Override + public int getMaxInputs() { + return Integer.MAX_VALUE; + } + + @Override + public int getMaxOutputs() { + return Integer.MAX_VALUE; + } + @Override public long onUpdate(FlowStage ctx, long now) { float capacity = this.capacity; diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java index a237c81e..c09987cd 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java @@ -22,6 +22,7 @@ package org.opendc.simulator.flow2.source; +import java.util.function.Consumer; import org.opendc.simulator.flow2.FlowGraph; import org.opendc.simulator.flow2.FlowStage; import org.opendc.simulator.flow2.FlowStageLogic; @@ -29,8 +30,6 @@ import org.opendc.simulator.flow2.OutHandler; import org.opendc.simulator.flow2.OutPort; import org.opendc.simulator.flow2.Outlet; -import java.util.function.Consumer; - /** * A {@link FlowSource} that ensures a flow is emitted for a specified amount of time at some utilization. */ @@ -53,7 +52,7 @@ public class RuntimeFlowSource implements FlowSource, FlowStageLogic { * @param completionHandler A callback invoked when the source completes. */ public RuntimeFlowSource( - FlowGraph graph, long duration, float utilization, Consumer completionHandler) { + FlowGraph graph, long duration, float utilization, Consumer completionHandler) { if (duration <= 0) { throw new IllegalArgumentException("Duration must be positive and non-zero"); } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java index 764a20a8..a0e9cb9d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java @@ -22,6 +22,7 @@ package org.opendc.simulator.flow2.source; +import java.util.function.Consumer; import org.opendc.simulator.flow2.FlowGraph; import org.opendc.simulator.flow2.FlowStage; import org.opendc.simulator.flow2.FlowStageLogic; @@ -29,8 +30,6 @@ import org.opendc.simulator.flow2.OutHandler; import org.opendc.simulator.flow2.OutPort; import org.opendc.simulator.flow2.Outlet; -import java.util.function.Consumer; - /** * A flow source that contains a fixed amount and is pushed with a given utilization. */ @@ -52,7 +51,7 @@ public final class SimpleFlowSource implements FlowSource, FlowStageLogic { * @param completionHandler A callback invoked when the source completes. */ public SimpleFlowSource( - FlowGraph graph, float amount, float utilization, Consumer completionHandler) { + FlowGraph graph, float amount, float utilization, Consumer completionHandler) { if (amount < 0.0) { throw new IllegalArgumentException("Amount must be non-negative"); } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java index 96d43aef..e8abc2d7 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java @@ -22,6 +22,7 @@ package org.opendc.simulator.flow2.source; +import java.util.function.Consumer; import org.opendc.simulator.flow2.FlowGraph; import org.opendc.simulator.flow2.FlowStage; import org.opendc.simulator.flow2.FlowStageLogic; @@ -29,8 +30,6 @@ import org.opendc.simulator.flow2.OutHandler; import org.opendc.simulator.flow2.OutPort; import org.opendc.simulator.flow2.Outlet; -import java.util.function.Consumer; - /** * A flow source that replays a sequence of fragments, each indicating the flow rate for some period of time. */ -- cgit v1.2.3