From ad88144923d76dfc421f0b22a0b4e670b3f6366e Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 21 Aug 2022 14:27:41 +0200 Subject: perf(sim/flow): Add support for multi-flow stages This change adds support for creating nodes in a flow graph that support multiple inputs and outputs directly, instead of our current approach where we need to re-implement the `FlowConsumerContext` interface in order to support multiple inputs or outputs. --- .../org/opendc/simulator/flow/FlowBenchmarks.kt | 2 +- .../org/opendc/simulator/flow2/FlowBenchmarks.kt | 108 +++++++++++++++++++++ 2 files changed, 109 insertions(+), 1 deletion(-) create mode 100644 opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt (limited to 'opendc-simulator/opendc-simulator-flow/src/jmh') diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt index 58f84d82..9e0a4a5e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt @@ -47,7 +47,7 @@ class FlowBenchmarks { @Setup fun setUp() { val random = ThreadLocalRandom.current() - val entries = List(10000) { TraceFlowSource.Fragment(1000, random.nextDouble(0.0, 4500.0)) } + val entries = List(1000000) { TraceFlowSource.Fragment(1000, random.nextDouble(0.0, 4500.0)) } trace = entries.asSequence() } diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt new file mode 100644 index 00000000..1b0e2e9e --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2 + +import kotlinx.coroutines.launch +import org.opendc.simulator.flow2.mux.MaxMinFlowMultiplexer +import org.opendc.simulator.flow2.sink.SimpleFlowSink +import org.opendc.simulator.flow2.source.TraceFlowSource +import org.opendc.simulator.kotlin.runSimulation +import org.openjdk.jmh.annotations.Benchmark +import org.openjdk.jmh.annotations.Fork +import org.openjdk.jmh.annotations.Measurement +import org.openjdk.jmh.annotations.Scope +import org.openjdk.jmh.annotations.Setup +import org.openjdk.jmh.annotations.State +import org.openjdk.jmh.annotations.Warmup +import java.util.concurrent.ThreadLocalRandom +import java.util.concurrent.TimeUnit + +@State(Scope.Thread) +@Fork(1) +@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) +class FlowBenchmarks { + private lateinit var trace: TraceFlowSource.Trace + + @Setup + fun setUp() { + val random = ThreadLocalRandom.current() + val traceSize = 10_000_000 + trace = TraceFlowSource.Trace( + LongArray(traceSize) { (it + 1) * 1000L }, + FloatArray(traceSize) { random.nextFloat(0.0f, 4500.0f) }, + traceSize + ) + } + + @Benchmark + fun benchmarkSink() { + return runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + val sink = SimpleFlowSink(graph, 4200.0f) + val source = TraceFlowSource(graph, trace) + graph.connect(source.output, sink.input) + } + } + + @Benchmark + fun benchmarkMuxMaxMinSingleSource() { + return runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + val switch = MaxMinFlowMultiplexer(graph) + + val sinkA = SimpleFlowSink(graph, 3000.0f) + val sinkB = SimpleFlowSink(graph, 3000.0f) + + graph.connect(switch.newOutput(), sinkA.input) + graph.connect(switch.newOutput(), sinkB.input) + + val source = TraceFlowSource(graph, trace) + graph.connect(source.output, switch.newInput()) + } + } + + @Benchmark + fun benchmarkMuxMaxMinTripleSource() { + return runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + val switch = MaxMinFlowMultiplexer(graph) + + val sinkA = SimpleFlowSink(graph, 3000.0f) + val sinkB = SimpleFlowSink(graph, 3000.0f) + + graph.connect(switch.newOutput(), sinkA.input) + graph.connect(switch.newOutput(), sinkB.input) + + repeat(3) { + launch { + val source = TraceFlowSource(graph, trace) + graph.connect(source.output, switch.newInput()) + } + } + } + } +} -- cgit v1.2.3 From e4f4e1c4ebd02278dc1c24ee481357989af6abe5 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 1 Sep 2022 12:02:06 +0200 Subject: feat(sim/flow): Support flow transformations This change adds a new component, FlowTransform, which is able to transform the flow from one component to another, based on some inversible transformation. Such as component is useful when converting between different units of flow between different components. --- .../kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) (limited to 'opendc-simulator/opendc-simulator-flow/src/jmh') diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt index 1b0e2e9e..fb112082 100644 --- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt @@ -26,6 +26,8 @@ import kotlinx.coroutines.launch import org.opendc.simulator.flow2.mux.MaxMinFlowMultiplexer import org.opendc.simulator.flow2.sink.SimpleFlowSink import org.opendc.simulator.flow2.source.TraceFlowSource +import org.opendc.simulator.flow2.util.FlowTransformer +import org.opendc.simulator.flow2.util.FlowTransforms import org.opendc.simulator.kotlin.runSimulation import org.openjdk.jmh.annotations.Benchmark import org.openjdk.jmh.annotations.Fork @@ -66,6 +68,20 @@ class FlowBenchmarks { } } + @Benchmark + fun benchmarkForward() { + return runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + val sink = SimpleFlowSink(graph, 4200.0f) + val source = TraceFlowSource(graph, trace) + val forwarder = FlowTransformer(graph, FlowTransforms.noop()) + + graph.connect(source.output, forwarder.input) + graph.connect(forwarder.output, sink.input) + } + } + @Benchmark fun benchmarkMuxMaxMinSingleSource() { return runSimulation { -- cgit v1.2.3 From 2a457d9e5480407d76440a2277817cb735c86ae1 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 4 Oct 2022 16:39:28 +0200 Subject: refactor(sim/flow): Remove old flow simulator This change removes the old version of the flow simulator from the OpenDC repository. The old version has been replaced by the new flow2 framework which is able to simulate flows more efficiently. --- .../org/opendc/simulator/flow/FlowBenchmarks.kt | 137 --------------------- 1 file changed, 137 deletions(-) delete mode 100644 opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt (limited to 'opendc-simulator/opendc-simulator-flow/src/jmh') diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt deleted file mode 100644 index 9e0a4a5e..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -import kotlinx.coroutines.launch -import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer -import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer -import org.opendc.simulator.flow.source.TraceFlowSource -import org.opendc.simulator.kotlin.runSimulation -import org.openjdk.jmh.annotations.Benchmark -import org.openjdk.jmh.annotations.Fork -import org.openjdk.jmh.annotations.Measurement -import org.openjdk.jmh.annotations.Scope -import org.openjdk.jmh.annotations.Setup -import org.openjdk.jmh.annotations.State -import org.openjdk.jmh.annotations.Warmup -import java.util.concurrent.ThreadLocalRandom -import java.util.concurrent.TimeUnit - -@State(Scope.Thread) -@Fork(1) -@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS) -@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) -class FlowBenchmarks { - private lateinit var trace: Sequence - - @Setup - fun setUp() { - val random = ThreadLocalRandom.current() - val entries = List(1000000) { TraceFlowSource.Fragment(1000, random.nextDouble(0.0, 4500.0)) } - trace = entries.asSequence() - } - - @Benchmark - fun benchmarkSink() { - return runSimulation { - val engine = FlowEngine(coroutineContext, clock) - val provider = FlowSink(engine, 4200.0) - return@runSimulation provider.consume(TraceFlowSource(trace)) - } - } - - @Benchmark - fun benchmarkForward() { - return runSimulation { - val engine = FlowEngine(coroutineContext, clock) - val provider = FlowSink(engine, 4200.0) - val forwarder = FlowForwarder(engine) - provider.startConsumer(forwarder) - return@runSimulation forwarder.consume(TraceFlowSource(trace)) - } - } - - @Benchmark - fun benchmarkMuxMaxMinSingleSource() { - return runSimulation { - val engine = FlowEngine(coroutineContext, clock) - val switch = MaxMinFlowMultiplexer(engine) - - FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) - FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) - - val provider = switch.newInput() - return@runSimulation provider.consume(TraceFlowSource(trace)) - } - } - - @Benchmark - fun benchmarkMuxMaxMinTripleSource() { - return runSimulation { - val engine = FlowEngine(coroutineContext, clock) - val switch = MaxMinFlowMultiplexer(engine) - - FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) - FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) - - repeat(3) { - launch { - val provider = switch.newInput() - provider.consume(TraceFlowSource(trace)) - } - } - } - } - - @Benchmark - fun benchmarkMuxExclusiveSingleSource() { - return runSimulation { - val engine = FlowEngine(coroutineContext, clock) - val switch = ForwardingFlowMultiplexer(engine) - - FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) - FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) - - val provider = switch.newInput() - return@runSimulation provider.consume(TraceFlowSource(trace)) - } - } - - @Benchmark - fun benchmarkMuxExclusiveTripleSource() { - return runSimulation { - val engine = FlowEngine(coroutineContext, clock) - val switch = ForwardingFlowMultiplexer(engine) - - FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) - FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) - - repeat(2) { - launch { - val provider = switch.newInput() - provider.consume(TraceFlowSource(trace)) - } - } - } - } -} -- cgit v1.2.3