diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-10-21 22:32:05 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-10-21 22:32:05 +0200 |
| commit | fa7fdbb0126ea465130961dc37c4ef2d6feb36e9 (patch) | |
| tree | 9cd46dd7970870b78990d6c35e8e2759d7cf5a13 /opendc-simulator/opendc-simulator-flow/src | |
| parent | 29beb50018cf2ad87b252c6c080f8c5de4600349 (diff) | |
| parent | 290e1fe14460d91e4703e55ac5f05dbe7b4505f7 (diff) | |
merge: Implement multi-flow stages in simulator (#110)
This pull request introduces the new `flow2` multi-flow simulator into the OpenDC codebase
and adjust all existing modules to make use of this new simulator.
The new simulator models flow as a network of components, which can each receive flow
from (potentially) multiple other components. In the previous simulator, the framework itself
supported only single flows between components and required re-implementation of many
components to support multiplexing flows.
Initial benchmarks show performance improvements in the range 2x–4x for large scale experiments
such as the Capelin benchmarks.
## Implementation Notes :hammer_and_pick:
* Add support for multi-flow stages
* Support flow transformations
* Add forwarding flow multiplexer
* Expose metrics on FlowMultiplexer
* Re-implement network sim using flow2
* Re-implement power sim using flow2
* Re-implement compute sim using flow2
* Optimize workload implementation of SimTrace
* Remove old flow simulator
* Add log4j-core dependency
## External Dependencies :four_leaf_clover:
* N/A
## Breaking API Changes :warning:
* Removal of the `org.opendc.simulator.flow` package. You should now use
the new flow simulator located in `org.opendc.simulator.flow2`.
* `PowerModel` interface is replaced by the `CpuPowerModel` interface.
* `PowerDriver` interface is replaced by the `SimPsu` and `SimPsuFactory` interfaces.
* Removal of `SimTraceWorkload`. Instead, create a workload from a `SimTrace` using
`createWorkload(offset)`.
* `ScalingGovernor` has been split in a `ScalingGovernor` and `ScalingGovernorFactory`.
* All modules in `opendc-simulator` are now written in Java. This means that default
parameters are not supported anymore for these modules.
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src')
62 files changed, 4280 insertions, 4537 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt deleted file mode 100644 index 58f84d82..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -import kotlinx.coroutines.launch -import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer -import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer -import org.opendc.simulator.flow.source.TraceFlowSource -import org.opendc.simulator.kotlin.runSimulation -import org.openjdk.jmh.annotations.Benchmark -import org.openjdk.jmh.annotations.Fork -import org.openjdk.jmh.annotations.Measurement -import org.openjdk.jmh.annotations.Scope -import org.openjdk.jmh.annotations.Setup -import org.openjdk.jmh.annotations.State -import org.openjdk.jmh.annotations.Warmup -import java.util.concurrent.ThreadLocalRandom -import java.util.concurrent.TimeUnit - -@State(Scope.Thread) -@Fork(1) -@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS) -@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) -class FlowBenchmarks { - private lateinit var trace: Sequence<TraceFlowSource.Fragment> - - @Setup - fun setUp() { - val random = ThreadLocalRandom.current() - val entries = List(10000) { TraceFlowSource.Fragment(1000, random.nextDouble(0.0, 4500.0)) } - trace = entries.asSequence() - } - - @Benchmark - fun benchmarkSink() { - return runSimulation { - val engine = FlowEngine(coroutineContext, clock) - val provider = FlowSink(engine, 4200.0) - return@runSimulation provider.consume(TraceFlowSource(trace)) - } - } - - @Benchmark - fun benchmarkForward() { - return runSimulation { - val engine = FlowEngine(coroutineContext, clock) - val provider = FlowSink(engine, 4200.0) - val forwarder = FlowForwarder(engine) - provider.startConsumer(forwarder) - return@runSimulation forwarder.consume(TraceFlowSource(trace)) - } - } - - @Benchmark - fun benchmarkMuxMaxMinSingleSource() { - return runSimulation { - val engine = FlowEngine(coroutineContext, clock) - val switch = MaxMinFlowMultiplexer(engine) - - FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) - FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) - - val provider = switch.newInput() - return@runSimulation provider.consume(TraceFlowSource(trace)) - } - } - - @Benchmark - fun benchmarkMuxMaxMinTripleSource() { - return runSimulation { - val engine = FlowEngine(coroutineContext, clock) - val switch = MaxMinFlowMultiplexer(engine) - - FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) - FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) - - repeat(3) { - launch { - val provider = switch.newInput() - provider.consume(TraceFlowSource(trace)) - } - } - } - } - - @Benchmark - fun benchmarkMuxExclusiveSingleSource() { - return runSimulation { - val engine = FlowEngine(coroutineContext, clock) - val switch = ForwardingFlowMultiplexer(engine) - - FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) - FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) - - val provider = switch.newInput() - return@runSimulation provider.consume(TraceFlowSource(trace)) - } - } - - @Benchmark - fun benchmarkMuxExclusiveTripleSource() { - return runSimulation { - val engine = FlowEngine(coroutineContext, clock) - val switch = ForwardingFlowMultiplexer(engine) - - FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) - FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) - - repeat(2) { - launch { - val provider = switch.newInput() - provider.consume(TraceFlowSource(trace)) - } - } - } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt new file mode 100644 index 00000000..fb112082 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2 + +import kotlinx.coroutines.launch +import org.opendc.simulator.flow2.mux.MaxMinFlowMultiplexer +import org.opendc.simulator.flow2.sink.SimpleFlowSink +import org.opendc.simulator.flow2.source.TraceFlowSource +import org.opendc.simulator.flow2.util.FlowTransformer +import org.opendc.simulator.flow2.util.FlowTransforms +import org.opendc.simulator.kotlin.runSimulation +import org.openjdk.jmh.annotations.Benchmark +import org.openjdk.jmh.annotations.Fork +import org.openjdk.jmh.annotations.Measurement +import org.openjdk.jmh.annotations.Scope +import org.openjdk.jmh.annotations.Setup +import org.openjdk.jmh.annotations.State +import org.openjdk.jmh.annotations.Warmup +import java.util.concurrent.ThreadLocalRandom +import java.util.concurrent.TimeUnit + +@State(Scope.Thread) +@Fork(1) +@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) +class FlowBenchmarks { + private lateinit var trace: TraceFlowSource.Trace + + @Setup + fun setUp() { + val random = ThreadLocalRandom.current() + val traceSize = 10_000_000 + trace = TraceFlowSource.Trace( + LongArray(traceSize) { (it + 1) * 1000L }, + FloatArray(traceSize) { random.nextFloat(0.0f, 4500.0f) }, + traceSize + ) + } + + @Benchmark + fun benchmarkSink() { + return runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + val sink = SimpleFlowSink(graph, 4200.0f) + val source = TraceFlowSource(graph, trace) + graph.connect(source.output, sink.input) + } + } + + @Benchmark + fun benchmarkForward() { + return runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + val sink = SimpleFlowSink(graph, 4200.0f) + val source = TraceFlowSource(graph, trace) + val forwarder = FlowTransformer(graph, FlowTransforms.noop()) + + graph.connect(source.output, forwarder.input) + graph.connect(forwarder.output, sink.input) + } + } + + @Benchmark + fun benchmarkMuxMaxMinSingleSource() { + return runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + val switch = MaxMinFlowMultiplexer(graph) + + val sinkA = SimpleFlowSink(graph, 3000.0f) + val sinkB = SimpleFlowSink(graph, 3000.0f) + + graph.connect(switch.newOutput(), sinkA.input) + graph.connect(switch.newOutput(), sinkB.input) + + val source = TraceFlowSource(graph, trace) + graph.connect(source.output, switch.newInput()) + } + } + + @Benchmark + fun benchmarkMuxMaxMinTripleSource() { + return runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + val switch = MaxMinFlowMultiplexer(graph) + + val sinkA = SimpleFlowSink(graph, 3000.0f) + val sinkB = SimpleFlowSink(graph, 3000.0f) + + graph.connect(switch.newOutput(), sinkA.input) + graph.connect(switch.newOutput(), sinkB.input) + + repeat(3) { + launch { + val source = TraceFlowSource(graph, trace) + graph.connect(source.output, switch.newInput()) + } + } + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java new file mode 100644 index 00000000..0ebb0da9 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java @@ -0,0 +1,260 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +import java.time.Clock; +import java.util.ArrayList; +import java.util.List; +import kotlin.coroutines.ContinuationInterceptor; +import kotlin.coroutines.CoroutineContext; +import kotlinx.coroutines.Delay; + +/** + * A {@link FlowEngine} simulates a generic flow network. + * <p> + * The engine centralizes the scheduling logic of state updates of flow connections, allowing update propagation + * to happen more efficiently. and overall, reducing the work necessary to transition into a steady state. + */ +public final class FlowEngine implements Runnable { + /** + * The queue of {@link FlowStage} updates that are scheduled for immediate execution. + */ + private final FlowStageQueue queue = new FlowStageQueue(256); + + /** + * A priority queue containing the {@link FlowStage} updates to be scheduled in the future. + */ + private final FlowTimerQueue timerQueue = new FlowTimerQueue(256); + + /** + * The stack of engine invocations to occur in the future. + */ + private final InvocationStack futureInvocations = new InvocationStack(256); + + /** + * A flag to indicate that the engine is active. + */ + private boolean active; + + private final CoroutineContext coroutineContext; + private final Clock clock; + private final Delay delay; + + /** + * Create a new {@link FlowEngine} instance using the specified {@link CoroutineContext} and {@link Clock}. + */ + public static FlowEngine create(CoroutineContext coroutineContext, Clock clock) { + return new FlowEngine(coroutineContext, clock); + } + + FlowEngine(CoroutineContext coroutineContext, Clock clock) { + this.coroutineContext = coroutineContext; + this.clock = clock; + + CoroutineContext.Key<? extends ContinuationInterceptor> key = ContinuationInterceptor.Key; + this.delay = (Delay) coroutineContext.get(key); + } + + /** + * Obtain the (virtual) {@link Clock} driving the simulation. + */ + public Clock getClock() { + return clock; + } + + /** + * Return a new {@link FlowGraph} that can be used to build a flow network. + */ + public FlowGraph newGraph() { + return new RootGraph(this); + } + + /** + * Enqueue the specified {@link FlowStage} to be updated immediately during the active engine cycle. + * <p> + * This method should be used when the state of a flow context is invalidated/interrupted and needs to be + * re-computed. + */ + void scheduleImmediate(long now, FlowStage ctx) { + scheduleImmediateInContext(ctx); + + // In-case the engine is already running in the call-stack, return immediately. The changes will be picked + // up by the active engine. + if (active) { + return; + } + + trySchedule(futureInvocations, now, now); + } + + /** + * Enqueue the specified {@link FlowStage} to be updated immediately during the active engine cycle. + * <p> + * This method should be used when the state of a flow context is invalidated/interrupted and needs to be + * re-computed. + * <p> + * This method should only be invoked while inside an engine cycle. + */ + void scheduleImmediateInContext(FlowStage ctx) { + queue.add(ctx); + } + + /** + * Enqueue the specified {@link FlowStage} to be updated at its updated deadline. + */ + void scheduleDelayed(FlowStage ctx) { + scheduleDelayedInContext(ctx); + + // In-case the engine is already running in the call-stack, return immediately. The changes will be picked + // up by the active engine. + if (active) { + return; + } + + long deadline = timerQueue.peekDeadline(); + if (deadline != Long.MAX_VALUE) { + trySchedule(futureInvocations, clock.millis(), deadline); + } + } + + /** + * Enqueue the specified {@link FlowStage} to be updated at its updated deadline. + * <p> + * This method should only be invoked while inside an engine cycle. + */ + void scheduleDelayedInContext(FlowStage ctx) { + FlowTimerQueue timerQueue = this.timerQueue; + timerQueue.enqueue(ctx); + } + + /** + * Run all the enqueued actions for the specified timestamp (<code>now</code>). + */ + private void doRunEngine(long now) { + final FlowStageQueue queue = this.queue; + final FlowTimerQueue timerQueue = this.timerQueue; + + try { + // Mark the engine as active to prevent concurrent calls to this method + active = true; + + // Execute all scheduled updates at current timestamp + while (true) { + final FlowStage ctx = timerQueue.poll(now); + if (ctx == null) { + break; + } + + ctx.onUpdate(now); + } + + // Execute all immediate updates + while (true) { + final FlowStage ctx = queue.poll(); + if (ctx == null) { + break; + } + + ctx.onUpdate(now); + } + } finally { + active = false; + } + + // Schedule an engine invocation for the next update to occur. + long headDeadline = timerQueue.peekDeadline(); + if (headDeadline != Long.MAX_VALUE && headDeadline >= now) { + trySchedule(futureInvocations, now, headDeadline); + } + } + + @Override + public void run() { + doRunEngine(futureInvocations.poll()); + } + + /** + * Try to schedule an engine invocation at the specified [target]. + * + * @param scheduled The queue of scheduled invocations. + * @param now The current virtual timestamp. + * @param target The virtual timestamp at which the engine invocation should happen. + */ + private void trySchedule(InvocationStack scheduled, long now, long target) { + // Only schedule a new scheduler invocation in case the target is earlier than all other pending + // scheduler invocations + if (scheduled.tryAdd(target)) { + delay.invokeOnTimeout(target - now, this, coroutineContext); + } + } + + /** + * Internal implementation of a root {@link FlowGraph}. + */ + private static final class RootGraph implements FlowGraphInternal { + private final FlowEngine engine; + private final List<FlowStage> stages = new ArrayList<>(); + + public RootGraph(FlowEngine engine) { + this.engine = engine; + } + + @Override + public FlowEngine getEngine() { + return engine; + } + + @Override + public FlowStage newStage(FlowStageLogic logic) { + final FlowEngine engine = this.engine; + final FlowStage stage = new FlowStage(this, logic); + stages.add(stage); + long now = engine.getClock().millis(); + stage.invalidate(now); + return stage; + } + + @Override + public void connect(Outlet outlet, Inlet inlet) { + FlowGraphInternal.connect(this, outlet, inlet); + } + + @Override + public void disconnect(Outlet outlet) { + FlowGraphInternal.disconnect(this, outlet); + } + + @Override + public void disconnect(Inlet inlet) { + FlowGraphInternal.disconnect(this, inlet); + } + + /** + * Internal method to remove the specified {@link FlowStage} from the graph. + */ + @Override + public void detach(FlowStage stage) { + stages.remove(stage); + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraph.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraph.java new file mode 100644 index 00000000..f45be6cd --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraph.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +/** + * A representation of a flow network. A flow network is a directed graph where each edge has a capacity and receives an + * amount of flow that cannot exceed the edge's capacity. + */ +public interface FlowGraph { + /** + * Return the {@link FlowEngine} driving the simulation of the graph. + */ + FlowEngine getEngine(); + + /** + * Create a new {@link FlowStage} representing a node in the flow network. + * + * @param logic The logic for handling the events of the stage. + */ + FlowStage newStage(FlowStageLogic logic); + + /** + * Add an edge between the specified outlet port and inlet port in this graph. + * + * @param outlet The outlet of the source from which the flow originates. + * @param inlet The inlet of the sink that should receive the flow. + */ + void connect(Outlet outlet, Inlet inlet); + + /** + * Disconnect the specified {@link Outlet} (if connected). + * + * @param outlet The outlet to disconnect. + */ + void disconnect(Outlet outlet); + + /** + * Disconnect the specified {@link Inlet} (if connected). + * + * @param inlet The inlet to disconnect. + */ + void disconnect(Inlet inlet); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraphInternal.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraphInternal.java new file mode 100644 index 00000000..0f608b60 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowGraphInternal.java @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +/** + * Interface implemented by {@link FlowGraph} implementations. + */ +interface FlowGraphInternal extends FlowGraph { + /** + * Internal method to remove the specified {@link FlowStage} from the graph. + */ + void detach(FlowStage stage); + + /** + * Helper method to connect an outlet to an inlet. + */ + static void connect(FlowGraph graph, Outlet outlet, Inlet inlet) { + if (!(outlet instanceof OutPort) || !(inlet instanceof InPort)) { + throw new IllegalArgumentException("Invalid outlet or inlet passed to graph"); + } + + InPort inPort = (InPort) inlet; + OutPort outPort = (OutPort) outlet; + + if (!graph.equals(outPort.getGraph()) || !graph.equals(inPort.getGraph())) { + throw new IllegalArgumentException("Outlet or inlet does not belong to graph"); + } else if (outPort.input != null || inPort.output != null) { + throw new IllegalStateException("Inlet or outlet already connected"); + } + + outPort.input = inPort; + inPort.output = outPort; + + inPort.connect(); + outPort.connect(); + } + + /** + * Helper method to disconnect an outlet. + */ + static void disconnect(FlowGraph graph, Outlet outlet) { + if (!(outlet instanceof OutPort)) { + throw new IllegalArgumentException("Invalid outlet passed to graph"); + } + + OutPort outPort = (OutPort) outlet; + + if (!graph.equals(outPort.getGraph())) { + throw new IllegalArgumentException("Outlet or inlet does not belong to graph"); + } + + outPort.cancel(null); + outPort.complete(); + } + + /** + * Helper method to disconnect an inlet. + */ + static void disconnect(FlowGraph graph, Inlet inlet) { + if (!(inlet instanceof InPort)) { + throw new IllegalArgumentException("Invalid outlet passed to graph"); + } + + InPort inPort = (InPort) inlet; + + if (!graph.equals(inPort.getGraph())) { + throw new IllegalArgumentException("Outlet or inlet does not belong to graph"); + } + + inPort.finish(null); + inPort.cancel(null); + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java new file mode 100644 index 00000000..4d098043 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java @@ -0,0 +1,303 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +import java.time.Clock; +import java.util.HashMap; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link FlowStage} represents a node in a {@link FlowGraph}. + */ +public final class FlowStage { + private static final Logger LOGGER = LoggerFactory.getLogger(FlowStage.class); + + /** + * States of the flow stage. + */ + private static final int STAGE_PENDING = 0; // Stage is pending to be started + + private static final int STAGE_ACTIVE = 1; // Stage is actively running + private static final int STAGE_CLOSED = 2; // Stage is closed + private static final int STAGE_STATE = 0b11; // Mask for accessing the state of the flow stage + + /** + * Flags of the flow connection + */ + private static final int STAGE_INVALIDATE = 1 << 2; // The stage is invalidated + + private static final int STAGE_CLOSE = 1 << 3; // The stage should be closed + private static final int STAGE_UPDATE_ACTIVE = 1 << 4; // An update for the connection is active + private static final int STAGE_UPDATE_PENDING = 1 << 5; // An (immediate) update of the connection is pending + + /** + * The flags representing the state and pending actions for the stage. + */ + private int flags = STAGE_PENDING; + + /** + * The deadline of the stage after which an update should run. + */ + long deadline = Long.MAX_VALUE; + + /** + * The index of the timer in the {@link FlowTimerQueue}. + */ + int timerIndex = -1; + + final Clock clock; + private final FlowStageLogic logic; + final FlowGraphInternal parentGraph; + private final FlowEngine engine; + + private final Map<String, InPort> inlets = new HashMap<>(); + private final Map<String, OutPort> outlets = new HashMap<>(); + private int nextInlet = 0; + private int nextOutlet = 0; + + /** + * Construct a new {@link FlowStage} instance. + * + * @param parentGraph The {@link FlowGraph} this stage belongs to. + * @param logic The logic of the stage. + */ + FlowStage(FlowGraphInternal parentGraph, FlowStageLogic logic) { + this.parentGraph = parentGraph; + this.logic = logic; + this.engine = parentGraph.getEngine(); + this.clock = engine.getClock(); + } + + /** + * Return the {@link FlowGraph} to which this stage belongs. + */ + public FlowGraph getGraph() { + return parentGraph; + } + + /** + * Return the {@link Inlet} (an in-going edge) with the specified <code>name</code> for this {@link FlowStage}. + * If an inlet with that name does not exist, a new one is allocated for the stage. + * + * @param name The name of the inlet. + * @return The {@link InPort} representing an {@link Inlet} with the specified <code>name</code>. + */ + public InPort getInlet(String name) { + return inlets.computeIfAbsent(name, (key) -> new InPort(this, key, nextInlet++)); + } + + /** + * Return the {@link Outlet} (an out-going edge) with the specified <code>name</code> for this {@link FlowStage}. + * If an outlet with that name does not exist, a new one is allocated for the stage. + * + * @param name The name of the outlet. + * @return The {@link OutPort} representing an {@link Outlet} with the specified <code>name</code>. + */ + public OutPort getOutlet(String name) { + return outlets.computeIfAbsent(name, (key) -> new OutPort(this, key, nextOutlet++)); + } + + /** + * Return the current deadline of the {@link FlowStage}'s timer (in milliseconds after epoch). + */ + public long getDeadline() { + return deadline; + } + + /** + * Set the deadline of the {@link FlowStage}'s timer. + * + * @param deadline The new deadline (in milliseconds after epoch) when the stage should be interrupted. + */ + public void setDeadline(long deadline) { + this.deadline = deadline; + + if ((flags & STAGE_UPDATE_ACTIVE) == 0) { + // Update the timer queue with the new deadline + engine.scheduleDelayed(this); + } + } + + /** + * Invalidate the {@link FlowStage} forcing the stage to update. + */ + public void invalidate() { + int flags = this.flags; + + if ((flags & STAGE_UPDATE_ACTIVE) == 0) { + scheduleImmediate(clock.millis(), flags | STAGE_INVALIDATE); + } + } + + /** + * Close the {@link FlowStage} and disconnect all inlets and outlets. + */ + public void close() { + int flags = this.flags; + + if ((flags & STAGE_STATE) == STAGE_CLOSED) { + return; + } + + // Toggle the close bit. In case no update is active, schedule a new update. + if ((flags & STAGE_UPDATE_ACTIVE) != 0) { + this.flags = flags | STAGE_CLOSE; + } else { + scheduleImmediate(clock.millis(), flags | STAGE_CLOSE); + } + } + + /** + * Update the state of the flow stage. + * + * @param now The current virtual timestamp. + */ + void onUpdate(long now) { + int flags = this.flags; + int state = flags & STAGE_STATE; + + if (state == STAGE_ACTIVE) { + doUpdate(now, flags); + } else if (state == STAGE_PENDING) { + doStart(now, flags); + } + } + + /** + * Invalidate the {@link FlowStage} forcing the stage to update. + * + * <p> + * This method is similar to {@link #invalidate()}, but allows the user to manually pass the current timestamp to + * prevent having to re-query the clock. This method should not be called during an update. + */ + void invalidate(long now) { + scheduleImmediate(now, flags | STAGE_INVALIDATE); + } + + /** + * Schedule an immediate update for this stage. + */ + private void scheduleImmediate(long now, int flags) { + // In case an immediate update is already scheduled, no need to do anything + if ((flags & STAGE_UPDATE_PENDING) != 0) { + this.flags = flags; + return; + } + + // Mark the stage that there is an update pending + this.flags = flags | STAGE_UPDATE_PENDING; + + engine.scheduleImmediate(now, this); + } + + /** + * Start the stage. + */ + private void doStart(long now, int flags) { + // Update state before calling into the outside world, so it observes a consistent state + flags = flags | STAGE_ACTIVE | STAGE_UPDATE_ACTIVE; + + doUpdate(now, flags); + } + + /** + * Update the state of the stage. + */ + private void doUpdate(long now, int flags) { + long deadline = this.deadline; + long newDeadline = deadline; + + // Update the stage if: + // (1) the timer of the stage has expired. + // (2) one of the input ports is pushed, + // (3) one of the output ports is pulled, + if ((flags & STAGE_INVALIDATE) != 0 || deadline == now) { + // Update state before calling into the outside world, so it observes a consistent state + this.flags = (flags & ~STAGE_INVALIDATE) | STAGE_UPDATE_ACTIVE; + + try { + newDeadline = logic.onUpdate(this, now); + + // IMPORTANT: Re-fetch the flags after the callback might have changed those + flags = this.flags; + } catch (Exception e) { + doFail(e); + } + } + + // Check whether the stage is marked as closing. + if ((flags & STAGE_CLOSE) != 0) { + doClose(flags, null); + + // IMPORTANT: Re-fetch the flags after the callback might have changed those + flags = this.flags; + } + + // Indicate that no update is active anymore and flush the flags + this.flags = flags & ~(STAGE_UPDATE_ACTIVE | STAGE_UPDATE_PENDING); + this.deadline = newDeadline; + + // Update the timer queue with the new deadline + engine.scheduleDelayedInContext(this); + } + + /** + * This method is invoked when an uncaught exception is caught by the engine. When this happens, the + * {@link FlowStageLogic} "fails" and disconnects all its inputs and outputs. + */ + void doFail(Throwable cause) { + LOGGER.warn("Uncaught exception (closing stage)", cause); + + doClose(flags, cause); + } + + /** + * This method is invoked when the {@link FlowStageLogic} exits successfully or due to failure. + */ + private void doClose(int flags, Throwable cause) { + // Mark the stage as closed + this.flags = flags & ~(STAGE_STATE | STAGE_INVALIDATE | STAGE_CLOSE) | STAGE_CLOSED; + + // Remove stage from parent graph + parentGraph.detach(this); + + // Remove stage from the timer queue + setDeadline(Long.MAX_VALUE); + + // Cancel all input ports + for (InPort port : inlets.values()) { + if (port != null) { + port.cancel(cause); + } + } + + // Cancel all output ports + for (OutPort port : outlets.values()) { + if (port != null) { + port.fail(cause); + } + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageLogic.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageLogic.java new file mode 100644 index 00000000..70986a35 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageLogic.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +/** + * The {@link FlowStageLogic} interface is responsible for describing the behaviour of a {@link FlowStage} via + * out-going flows based on its potential inputs. + */ +public interface FlowStageLogic { + /** + * This method is invoked when the one of the stage's inlets or outlets is invalidated. + * + * @param ctx The context in which the stage runs. + * @param now The virtual timestamp in milliseconds after epoch at which the update is occurring. + * @return The next deadline for the stage. + */ + long onUpdate(FlowStage ctx, long now); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageQueue.java new file mode 100644 index 00000000..56ec7702 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStageQueue.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +import java.util.ArrayDeque; +import java.util.Arrays; + +/** + * A specialized {@link ArrayDeque} implementation that contains the {@link FlowStageLogic}s + * that have been updated during the engine cycle and should converge. + * <p> + * By using a specialized class, we reduce the overhead caused by type-erasure. + */ +final class FlowStageQueue { + /** + * The array of elements in the queue. + */ + private FlowStage[] elements; + + private int head = 0; + private int tail = 0; + + public FlowStageQueue(int initialCapacity) { + elements = new FlowStage[initialCapacity]; + } + + /** + * Add the specified context to the queue. + */ + void add(FlowStage ctx) { + final FlowStage[] es = elements; + int tail = this.tail; + + es[tail] = ctx; + + tail = inc(tail, es.length); + this.tail = tail; + + if (head == tail) { + doubleCapacity(); + } + } + + /** + * Remove a {@link FlowStage} from the queue or <code>null</code> if the queue is empty. + */ + FlowStage poll() { + final FlowStage[] es = elements; + int head = this.head; + FlowStage ctx = es[head]; + + if (ctx != null) { + es[head] = null; + this.head = inc(head, es.length); + } + + return ctx; + } + + /** + * Doubles the capacity of this deque + */ + private void doubleCapacity() { + int oldCapacity = elements.length; + int newCapacity = oldCapacity + (oldCapacity >> 1); + if (newCapacity < 0) { + throw new IllegalStateException("Sorry, deque too big"); + } + + final FlowStage[] es = elements = Arrays.copyOf(elements, newCapacity); + + // Exceptionally, here tail == head needs to be disambiguated + if (tail < head || (tail == head && es[head] != null)) { + // wrap around; slide first leg forward to end of array + int newSpace = newCapacity - oldCapacity; + System.arraycopy(es, head, es, head + newSpace, oldCapacity - head); + for (int i = head, to = (head += newSpace); i < to; i++) es[i] = null; + } + } + + /** + * Circularly increments i, mod modulus. + * Precondition and postcondition: 0 <= i < modulus. + */ + private static int inc(int i, int modulus) { + if (++i >= modulus) i = 0; + return i; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowTimerQueue.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowTimerQueue.java new file mode 100644 index 00000000..4b746202 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowTimerQueue.java @@ -0,0 +1,208 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +import java.util.Arrays; + +/** + * A specialized priority queue for timers of {@link FlowStageLogic}s. + * <p> + * By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation + * being generic. + */ +final class FlowTimerQueue { + /** + * Array representation of binary heap of {@link FlowStage} instances. + */ + private FlowStage[] queue; + + /** + * The number of elements in the priority queue. + */ + private int size = 0; + + /** + * Construct a {@link FlowTimerQueue} with the specified initial capacity. + * + * @param initialCapacity The initial capacity of the queue. + */ + public FlowTimerQueue(int initialCapacity) { + this.queue = new FlowStage[initialCapacity]; + } + + /** + * Enqueue a timer for the specified context or update the existing timer. + */ + void enqueue(FlowStage ctx) { + FlowStage[] es = queue; + int k = ctx.timerIndex; + + if (ctx.deadline != Long.MAX_VALUE) { + if (k >= 0) { + update(es, ctx, k); + } else { + add(es, ctx); + } + } else if (k >= 0) { + delete(es, k); + } + } + + /** + * Retrieve the head of the queue if its deadline does not exceed <code>now</code>. + * + * @param now The timestamp that the deadline of the head of the queue should not exceed. + * @return The head of the queue if its deadline does not exceed <code>now</code>, otherwise <code>null</code>. + */ + FlowStage poll(long now) { + int size = this.size; + if (size == 0) { + return null; + } + + final FlowStage[] es = queue; + final FlowStage head = es[0]; + + if (now < head.deadline) { + return null; + } + + int n = size - 1; + this.size = n; + final FlowStage next = es[n]; + es[n] = null; // Clear the last element of the queue + + if (n > 0) { + siftDown(0, next, es, n); + } + + head.timerIndex = -1; + return head; + } + + /** + * Find the earliest deadline in the queue. + */ + long peekDeadline() { + if (size > 0) { + return queue[0].deadline; + } + + return Long.MAX_VALUE; + } + + /** + * Add a new entry to the queue. + */ + private void add(FlowStage[] es, FlowStage ctx) { + int i = size; + + if (i >= es.length) { + // Re-fetch the resized array + es = grow(); + } + + siftUp(i, ctx, es); + + size = i + 1; + } + + /** + * Update the deadline of an existing entry in the queue. + */ + private void update(FlowStage[] es, FlowStage ctx, int k) { + if (k > 0) { + int parent = (k - 1) >>> 1; + if (es[parent].deadline > ctx.deadline) { + siftUp(k, ctx, es); + return; + } + } + + siftDown(k, ctx, es, size); + } + + /** + * Deadline an entry from the queue. + */ + private void delete(FlowStage[] es, int k) { + int s = --size; + if (s == k) { + es[k] = null; // Element is last in the queue + } else { + FlowStage moved = es[s]; + es[s] = null; + + siftDown(k, moved, es, s); + + if (es[k] == moved) { + siftUp(k, moved, es); + } + } + } + + /** + * Increases the capacity of the array. + */ + private FlowStage[] grow() { + FlowStage[] queue = this.queue; + int oldCapacity = queue.length; + int newCapacity = oldCapacity + (oldCapacity >> 1); + + queue = Arrays.copyOf(queue, newCapacity); + this.queue = queue; + return queue; + } + + private static void siftUp(int k, FlowStage key, FlowStage[] es) { + while (k > 0) { + int parent = (k - 1) >>> 1; + FlowStage e = es[parent]; + if (key.deadline >= e.deadline) break; + es[k] = e; + e.timerIndex = k; + k = parent; + } + es[k] = key; + key.timerIndex = k; + } + + private static void siftDown(int k, FlowStage key, FlowStage[] es, int n) { + int half = n >>> 1; // loop while a non-leaf + while (k < half) { + int child = (k << 1) + 1; // assume left child is least + FlowStage c = es[child]; + int right = child + 1; + if (right < n && c.deadline > es[right].deadline) c = es[child = right]; + + if (key.deadline <= c.deadline) break; + + es[k] = c; + c.timerIndex = k; + k = child; + } + + es[k] = key; + key.timerIndex = k; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandler.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandler.java new file mode 100644 index 00000000..839b01db --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandler.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +/** + * Collection of callbacks for the input port (a {@link InPort}) of a {@link FlowStageLogic}. + */ +public interface InHandler { + /** + * Return the actual flow rate over the input port. + * + * @param port The input port to which the flow was pushed. + * @return The actual flow rate over the port. + */ + default float getRate(InPort port) { + return Math.min(port.getDemand(), port.getCapacity()); + } + + /** + * This method is invoked when another {@link FlowStageLogic} changes the rate of flow to the specified inlet. + * + * @param port The input port to which the flow was pushed. + * @param demand The rate of flow the output attempted to push to the port. + */ + void onPush(InPort port, float demand); + + /** + * This method is invoked when the input port is finished. + * + * @param port The input port that has finished. + * @param cause The cause of the input port being finished or <code>null</code> if the port completed successfully. + */ + void onUpstreamFinish(InPort port, Throwable cause); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandlers.java index b3191ad3..9d5b4bef 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InHandlers.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,33 +20,34 @@ * SOFTWARE. */ -package org.opendc.simulator.flow.source +package org.opendc.simulator.flow2; /** - * The [FlowSourceBarrier] is a barrier that allows multiple sources to wait for a select number of other sources to - * finish a pull, before proceeding its operation. + * A collection of common {@link InHandler} implementations. */ -public class FlowSourceBarrier(public val parties: Int) { - private var counter = 0 +public class InHandlers { + /** + * Prevent construction of this class. + */ + private InHandlers() {} /** - * Enter the barrier and determine whether the caller is the last to reach the barrier. - * - * @return `true` if the caller is the last to reach the barrier, `false` otherwise. + * Return an {@link InHandler} that does nothing. */ - public fun enter(): Boolean { - val last = ++counter == parties - if (last) { - counter = 0 - return true - } - return false + public static InHandler noop() { + return NoopInHandler.INSTANCE; } /** - * Reset the barrier. + * No-op implementation of {@link InHandler}. */ - public fun reset() { - counter = 0 + private static final class NoopInHandler implements InHandler { + public static final InHandler INSTANCE = new NoopInHandler(); + + @Override + public void onPush(InPort port, float demand) {} + + @Override + public void onUpstreamFinish(InPort port, Throwable cause) {} } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java new file mode 100644 index 00000000..fba12aaf --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java @@ -0,0 +1,214 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +import java.time.Clock; +import java.util.Objects; + +/** + * A port that consumes a flow. + * <p> + * Input ports are represented as in-going edges in the flow graph. + */ +public final class InPort implements Inlet { + private final int id; + + private float capacity; + private float demand; + + private boolean mask; + + OutPort output; + private InHandler handler = InHandlers.noop(); + private final Clock clock; + private final String name; + private final FlowStage stage; + + InPort(FlowStage stage, String name, int id) { + this.name = name; + this.id = id; + this.stage = stage; + this.clock = stage.clock; + } + + @Override + public FlowGraph getGraph() { + return stage.parentGraph; + } + + @Override + public String getName() { + return name; + } + + /** + * Return the identifier of the {@link InPort} with respect to its stage. + */ + public int getId() { + return id; + } + + /** + * Return the current capacity of the input port. + */ + public float getCapacity() { + return capacity; + } + + /** + * Return the current demand of flow of the input port. + */ + public float getDemand() { + return demand; + } + + /** + * Return the current rate of flow of the input port. + */ + public float getRate() { + return handler.getRate(this); + } + + /** + * Pull the flow with the specified <code>capacity</code> from the input port. + * + * @param capacity The maximum throughput that the stage can receive from the input port. + */ + public void pull(float capacity) { + this.capacity = capacity; + + OutPort output = this.output; + if (output != null) { + output.pull(capacity); + } + } + + /** + * Return the current {@link InHandler} of the input port. + */ + public InHandler getHandler() { + return handler; + } + + /** + * Set the {@link InHandler} of the input port. + */ + public void setHandler(InHandler handler) { + this.handler = handler; + } + + /** + * Return the mask of this port. + * <p> + * Stages ignore events originating from masked ports. + */ + public boolean getMask() { + return mask; + } + + /** + * (Un)mask the port. + */ + public void setMask(boolean mask) { + this.mask = mask; + } + + /** + * Disconnect the input port from its (potentially) connected outlet. + * <p> + * The inlet can still be used and re-connected to another outlet. + * + * @param cause The cause for disconnecting the port or <code>null</code> when no more flow is needed. + */ + public void cancel(Throwable cause) { + demand = 0.f; + + OutPort output = this.output; + if (output != null) { + this.output = null; + output.input = null; + output.cancel(cause); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + InPort port = (InPort) o; + return stage.equals(port.stage) && name.equals(port.name); + } + + @Override + public int hashCode() { + return Objects.hash(stage.parentGraph, name); + } + + /** + * This method is invoked when the inlet is connected to an outlet. + */ + void connect() { + OutPort output = this.output; + output.pull(capacity); + } + + /** + * Push a flow from an outlet to this inlet. + * + * @param demand The rate of flow to push. + */ + void push(float demand) { + // No-op when the rate is unchanged + if (this.demand == demand) { + return; + } + + try { + handler.onPush(this, demand); + this.demand = demand; + + if (!mask) { + stage.invalidate(clock.millis()); + } + } catch (Exception e) { + stage.doFail(e); + } + } + + /** + * This method is invoked by the connected {@link OutPort} when it finishes. + */ + void finish(Throwable cause) { + try { + long now = clock.millis(); + handler.onUpstreamFinish(this, cause); + this.demand = 0.f; + + if (!mask) { + stage.invalidate(now); + } + } catch (Exception e) { + stage.doFail(e); + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Inlet.java index 62cb10d1..4a9ea6a5 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Inlet.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,16 +20,19 @@ * SOFTWARE. */ -package org.opendc.simulator.flow +package org.opendc.simulator.flow2; /** - * A listener interface for when a flow stage has converged into a steady-state. + * An in-going edge in a {@link FlowGraph}. */ -public interface FlowConvergenceListener { +public interface Inlet { /** - * This method is invoked when the system has converged to a steady-state. - * - * @param now The timestamp at which the system converged. + * Return the {@link FlowGraph} to which the inlet is exposed. */ - public fun onConverge(now: Long) {} + FlowGraph getGraph(); + + /** + * Return the name of the inlet. + */ + String getName(); } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InvocationStack.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InvocationStack.java new file mode 100644 index 00000000..a5b5114b --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InvocationStack.java @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +import java.util.Arrays; + +/** + * A specialized monotonic stack implementation for tracking the scheduled engine invocations. + * <p> + * By using a specialized class, we reduce the overhead caused by type-erasure. + */ +final class InvocationStack { + /** + * The array of elements in the stack. + */ + private long[] elements; + + private int head = -1; + + public InvocationStack(int initialCapacity) { + elements = new long[initialCapacity]; + Arrays.fill(elements, Long.MIN_VALUE); + } + + /** + * Try to add the specified invocation to the monotonic stack. + * + * @param invocation The timestamp of the invocation. + * @return <code>true</code> if the invocation was added, <code>false</code> otherwise. + */ + boolean tryAdd(long invocation) { + final long[] es = elements; + int head = this.head; + + if (head < 0 || es[head] > invocation) { + es[head + 1] = invocation; + this.head = head + 1; + + if (head + 2 == es.length) { + doubleCapacity(); + } + + return true; + } + + return false; + } + + /** + * Remove the head invocation from the stack or return {@link Long#MAX_VALUE} if the stack is empty. + */ + long poll() { + final long[] es = elements; + int head = this.head--; + + if (head >= 0) { + return es[head]; + } + + return Long.MAX_VALUE; + } + + /** + * Doubles the capacity of this deque + */ + private void doubleCapacity() { + int oldCapacity = elements.length; + int newCapacity = oldCapacity + (oldCapacity >> 1); + if (newCapacity < 0) { + throw new IllegalStateException("Sorry, deque too big"); + } + + elements = Arrays.copyOf(elements, newCapacity); + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandler.java index 98922ab3..723c6d6b 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandler.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,43 +20,28 @@ * SOFTWARE. */ -package org.opendc.simulator.flow +package org.opendc.simulator.flow2; /** - * A controllable [FlowConnection]. - * - * This interface is used by [FlowConsumer]s to control the connection between it and the source. + * Collection of callbacks for the output port (a {@link OutPort}) of a {@link FlowStageLogic}. */ -public interface FlowConsumerContext : FlowConnection { - /** - * The deadline of the source. - */ - public val deadline: Long - - /** - * The capacity of the connection. - */ - public override var capacity: Double - +public interface OutHandler { /** - * A flag to control whether [FlowConsumerLogic.onConverge] should be invoked for the consumer. - */ - public var shouldConsumerConverge: Boolean - - /** - * A flag to control whether the timers for the [FlowSource] should be enabled. - */ - public var enableTimers: Boolean - - /** - * Start the flow over the connection. + * This method is invoked when another {@link FlowStageLogic} changes the capacity of the outlet. + * + * @param port The output port of which the capacity was changed. + * @param capacity The new capacity of the outlet. */ - public fun start() + void onPull(OutPort port, float capacity); /** - * Synchronously pull the source of the connection. + * This method is invoked when the output port no longer accepts any flow. + * <p> + * After this callback no other callbacks will be called for this port. * - * @param now The timestamp at which the connection is pulled. + * @param port The outlet that no longer accepts any flow. + * @param cause The cause of the output port no longer accepting any flow or <code>null</code> if the port closed + * successfully. */ - public fun pullSync(now: Long) + void onDownstreamFinish(OutPort port, Throwable cause); } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandlers.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandlers.java new file mode 100644 index 00000000..8fbfda0d --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutHandlers.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +/** + * A collection of common {@link OutHandler} implementations. + */ +public class OutHandlers { + /** + * Prevent construction of this class. + */ + private OutHandlers() {} + + /** + * Return an {@link OutHandler} that does nothing. + */ + public static OutHandler noop() { + return NoopOutHandler.INSTANCE; + } + + /** + * No-op implementation of {@link OutHandler}. + */ + private static final class NoopOutHandler implements OutHandler { + public static final OutHandler INSTANCE = new NoopOutHandler(); + + @Override + public void onPull(OutPort port, float capacity) {} + + @Override + public void onDownstreamFinish(OutPort port, Throwable cause) {} + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java new file mode 100644 index 00000000..332296a0 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java @@ -0,0 +1,224 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +import java.time.Clock; +import java.util.Objects; + +/** + * A port that outputs a flow. + * <p> + * Output ports are represented as out-going edges in the flow graph. + */ +public final class OutPort implements Outlet { + private final int id; + + private float capacity; + private float demand; + + private boolean mask; + + InPort input; + private OutHandler handler = OutHandlers.noop(); + private final String name; + private final FlowStage stage; + private final Clock clock; + + OutPort(FlowStage stage, String name, int id) { + this.name = name; + this.id = id; + this.stage = stage; + this.clock = stage.clock; + } + + @Override + public FlowGraph getGraph() { + return stage.parentGraph; + } + + @Override + public String getName() { + return name; + } + + /** + * Return the identifier of the {@link OutPort} with respect to its stage. + */ + public int getId() { + return id; + } + + /** + * Return the capacity of the output port. + */ + public float getCapacity() { + return capacity; + } + + /** + * Return the current demand of flow of the output port. + */ + public float getDemand() { + return demand; + } + + /** + * Return the current rate of flow of the input port. + */ + public float getRate() { + InPort input = this.input; + if (input != null) { + return input.getRate(); + } + + return 0.f; + } + + /** + * Return the current {@link OutHandler} of the output port. + */ + public OutHandler getHandler() { + return handler; + } + + /** + * Set the {@link OutHandler} of the output port. + */ + public void setHandler(OutHandler handler) { + this.handler = handler; + } + + /** + * Return the mask of this port. + * <p> + * Stages ignore events originating from masked ports. + */ + public boolean getMask() { + return mask; + } + + /** + * (Un)mask the port. + */ + public void setMask(boolean mask) { + this.mask = mask; + } + + /** + * Push the given flow rate over output port. + * + * @param rate The rate of the flow to push. + */ + public void push(float rate) { + demand = rate; + InPort input = this.input; + + if (input != null) { + input.push(rate); + } + } + + /** + * Signal to the downstream port that the output has completed successfully and disconnect the port from its input. + * <p> + * The output port can still be used and re-connected to another input. + */ + public void complete() { + fail(null); + } + + /** + * Signal a failure to the downstream port and disconnect the port from its input. + * <p> + * The output can still be used and re-connected to another input. + */ + public void fail(Throwable cause) { + capacity = 0.f; + + InPort input = this.input; + if (input != null) { + this.input = null; + input.output = null; + input.finish(cause); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + OutPort port = (OutPort) o; + return stage.equals(port.stage) && name.equals(port.name); + } + + @Override + public int hashCode() { + return Objects.hash(stage.parentGraph, name); + } + + /** + * This method is invoked when the outlet is connected to an inlet. + */ + void connect() { + input.push(demand); + } + + /** + * Pull from this outlet with a specified capacity. + * + * @param capacity The capacity of the inlet. + */ + void pull(float capacity) { + // No-op when outlet is not active or the rate is unchanged + if (this.capacity == capacity) { + return; + } + + try { + handler.onPull(this, capacity); + this.capacity = capacity; + + if (!mask) { + stage.invalidate(clock.millis()); + } + } catch (Exception e) { + stage.doFail(e); + } + } + + /** + * This method is invoked by the connected {@link InPort} when downstream cancels the connection. + */ + void cancel(Throwable cause) { + try { + handler.onDownstreamFinish(this, cause); + this.capacity = 0.f; + + if (!mask) { + stage.invalidate(clock.millis()); + } + } catch (Exception e) { + stage.doFail(e); + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Outlet.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Outlet.java new file mode 100644 index 00000000..32e19a3b --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/Outlet.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2; + +/** + * An out-going edge in a {@link FlowGraph}. + */ +public interface Outlet { + /** + * Return the {@link FlowGraph} to which the outlet is exposed. + */ + FlowGraph getGraph(); + + /** + * Return the name of the outlet. + */ + String getName(); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java new file mode 100644 index 00000000..dec98955 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexer.java @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.mux; + +import org.opendc.simulator.flow2.FlowStageLogic; +import org.opendc.simulator.flow2.Inlet; +import org.opendc.simulator.flow2.Outlet; + +/** + * A {@link FlowStageLogic} that multiplexes multiple inputs over (possibly) multiple outputs. + */ +public interface FlowMultiplexer { + /** + * Return maximum number of inputs supported by the multiplexer. + */ + int getMaxInputs(); + + /** + * Return maximum number of outputs supported by the multiplexer. + */ + int getMaxOutputs(); + + /** + * Return the number of active inputs on this multiplexer. + */ + int getInputCount(); + + /** + * Allocate a new input on this multiplexer with the specified capacity.. + * + * @return The identifier of the input for this stage. + */ + Inlet newInput(); + + /** + * Release the input at the specified slot. + * + * @param inlet The inlet to release. + */ + void releaseInput(Inlet inlet); + + /** + * Return the number of active outputs on this multiplexer. + */ + int getOutputCount(); + + /** + * Allocate a new output on this multiplexer. + * + * @return The outlet for this stage. + */ + Outlet newOutput(); + + /** + * Release the output at the specified slot. + * + * @param outlet The outlet to release. + */ + void releaseOutput(Outlet outlet); + + /** + * Return the total input capacity of the {@link FlowMultiplexer}. + */ + float getCapacity(); + + /** + * Return the total input demand for the {@link FlowMultiplexer}. + */ + float getDemand(); + + /** + * Return the total input rate for the {@link FlowMultiplexer}. + */ + float getRate(); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexerFactory.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexerFactory.java new file mode 100644 index 00000000..0b5b9141 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/FlowMultiplexerFactory.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.mux; + +import org.opendc.simulator.flow2.FlowGraph; + +/** + * Factory interface for a {@link FlowMultiplexer} implementation. + */ +public interface FlowMultiplexerFactory { + /** + * Construct a new {@link FlowMultiplexer} belonging to the specified {@link FlowGraph}. + * + * @param graph The graph to which the multiplexer belongs. + */ + FlowMultiplexer newMultiplexer(FlowGraph graph); + + /** + * Return a {@link FlowMultiplexerFactory} for {@link ForwardingFlowMultiplexer} instances. + */ + static FlowMultiplexerFactory forwardingMultiplexer() { + return ForwardingFlowMultiplexer.FACTORY; + } + + /** + * Return a {@link FlowMultiplexerFactory} for {@link MaxMinFlowMultiplexer} instances. + */ + static FlowMultiplexerFactory maxMinMultiplexer() { + return MaxMinFlowMultiplexer.FACTORY; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java new file mode 100644 index 00000000..abe3510b --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexer.java @@ -0,0 +1,280 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.mux; + +import java.util.Arrays; +import java.util.BitSet; +import org.opendc.simulator.flow2.FlowGraph; +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.FlowStageLogic; +import org.opendc.simulator.flow2.InHandler; +import org.opendc.simulator.flow2.InPort; +import org.opendc.simulator.flow2.Inlet; +import org.opendc.simulator.flow2.OutHandler; +import org.opendc.simulator.flow2.OutPort; +import org.opendc.simulator.flow2.Outlet; + +/** + * A {@link FlowMultiplexer} implementation that allocates inputs to the outputs of the multiplexer exclusively. + * This means that a single input is directly connected to an output and that the multiplexer can only support as many + * inputs as outputs. + */ +public final class ForwardingFlowMultiplexer implements FlowMultiplexer, FlowStageLogic { + /** + * Factory implementation for this implementation. + */ + static FlowMultiplexerFactory FACTORY = ForwardingFlowMultiplexer::new; + + public final IdleInHandler IDLE_IN_HANDLER = new IdleInHandler(); + public final IdleOutHandler IDLE_OUT_HANDLER = new IdleOutHandler(); + + private final FlowStage stage; + + private InPort[] inlets; + private OutPort[] outlets; + private final BitSet activeInputs; + private final BitSet activeOutputs; + private final BitSet availableOutputs; + + private float capacity = 0.f; + private float demand = 0.f; + + public ForwardingFlowMultiplexer(FlowGraph graph) { + this.stage = graph.newStage(this); + + this.inlets = new InPort[4]; + this.activeInputs = new BitSet(); + this.outlets = new OutPort[4]; + this.activeOutputs = new BitSet(); + this.availableOutputs = new BitSet(); + } + + @Override + public float getCapacity() { + return capacity; + } + + @Override + public float getDemand() { + return demand; + } + + @Override + public float getRate() { + final BitSet activeOutputs = this.activeOutputs; + final OutPort[] outlets = this.outlets; + float rate = 0.f; + for (int i = activeOutputs.nextSetBit(0); i != -1; i = activeOutputs.nextSetBit(i + 1)) { + rate += outlets[i].getRate(); + } + return rate; + } + + @Override + public int getMaxInputs() { + return getOutputCount(); + } + + @Override + public int getMaxOutputs() { + return Integer.MAX_VALUE; + } + + @Override + public int getInputCount() { + return activeInputs.length(); + } + + @Override + public Inlet newInput() { + final BitSet activeInputs = this.activeInputs; + int slot = activeInputs.nextClearBit(0); + + InPort inPort = stage.getInlet("in" + slot); + inPort.setMask(true); + + InPort[] inlets = this.inlets; + if (slot >= inlets.length) { + int newLength = inlets.length + (inlets.length >> 1); + inlets = Arrays.copyOf(inlets, newLength); + this.inlets = inlets; + } + + final BitSet availableOutputs = this.availableOutputs; + int outSlot = availableOutputs.nextSetBit(0); + + if (outSlot < 0) { + throw new IllegalStateException("No capacity available for a new input"); + } + + inlets[slot] = inPort; + activeInputs.set(slot); + + OutPort outPort = outlets[outSlot]; + availableOutputs.clear(outSlot); + + inPort.setHandler(new ForwardingInHandler(outPort)); + outPort.setHandler(new ForwardingOutHandler(inPort)); + + inPort.pull(outPort.getCapacity()); + + return inPort; + } + + @Override + public void releaseInput(Inlet inlet) { + InPort port = (InPort) inlet; + int slot = port.getId(); + + final BitSet activeInputs = this.activeInputs; + + if (!activeInputs.get(slot)) { + return; + } + + port.cancel(null); + activeInputs.clear(slot); + + ForwardingInHandler inHandler = (ForwardingInHandler) port.getHandler(); + availableOutputs.set(inHandler.output.getId()); + + port.setHandler(IDLE_IN_HANDLER); + } + + @Override + public int getOutputCount() { + return activeOutputs.length(); + } + + @Override + public Outlet newOutput() { + final BitSet activeOutputs = this.activeOutputs; + int slot = activeOutputs.nextClearBit(0); + + OutPort port = stage.getOutlet("out" + slot); + OutPort[] outlets = this.outlets; + if (slot >= outlets.length) { + int newLength = outlets.length + (outlets.length >> 1); + outlets = Arrays.copyOf(outlets, newLength); + this.outlets = outlets; + } + outlets[slot] = port; + + activeOutputs.set(slot); + availableOutputs.set(slot); + return port; + } + + @Override + public void releaseOutput(Outlet outlet) { + OutPort port = (OutPort) outlet; + int slot = port.getId(); + activeInputs.clear(slot); + availableOutputs.clear(slot); + port.complete(); + + port.setHandler(IDLE_OUT_HANDLER); + } + + @Override + public long onUpdate(FlowStage ctx, long now) { + return Long.MAX_VALUE; + } + + class ForwardingInHandler implements InHandler { + final OutPort output; + + ForwardingInHandler(OutPort output) { + this.output = output; + } + + @Override + public float getRate(InPort port) { + return output.getRate(); + } + + @Override + public void onPush(InPort port, float rate) { + ForwardingFlowMultiplexer.this.demand += -port.getDemand() + rate; + + output.push(rate); + } + + @Override + public void onUpstreamFinish(InPort port, Throwable cause) { + ForwardingFlowMultiplexer.this.demand -= port.getDemand(); + + final OutPort output = this.output; + output.push(0.f); + + releaseInput(port); + } + } + + private class ForwardingOutHandler implements OutHandler { + private final InPort input; + + ForwardingOutHandler(InPort input) { + this.input = input; + } + + @Override + public void onPull(OutPort port, float capacity) { + ForwardingFlowMultiplexer.this.capacity += -port.getCapacity() + capacity; + + input.pull(capacity); + } + + @Override + public void onDownstreamFinish(OutPort port, Throwable cause) { + ForwardingFlowMultiplexer.this.capacity -= port.getCapacity(); + + input.cancel(cause); + + releaseOutput(port); + } + } + + private static class IdleInHandler implements InHandler { + @Override + public float getRate(InPort port) { + return 0.f; + } + + @Override + public void onPush(InPort port, float rate) { + port.cancel(new IllegalStateException("Inlet is not allocated")); + } + + @Override + public void onUpstreamFinish(InPort port, Throwable cause) {} + } + + private static class IdleOutHandler implements OutHandler { + @Override + public void onPull(OutPort port, float capacity) {} + + @Override + public void onDownstreamFinish(OutPort port, Throwable cause) {} + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java new file mode 100644 index 00000000..ac5c4f5c --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexer.java @@ -0,0 +1,297 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.mux; + +import java.util.Arrays; +import java.util.BitSet; +import org.opendc.simulator.flow2.FlowGraph; +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.FlowStageLogic; +import org.opendc.simulator.flow2.InHandler; +import org.opendc.simulator.flow2.InPort; +import org.opendc.simulator.flow2.Inlet; +import org.opendc.simulator.flow2.OutHandler; +import org.opendc.simulator.flow2.OutPort; +import org.opendc.simulator.flow2.Outlet; + +/** + * A {@link FlowMultiplexer} implementation that distributes the available capacity of the outputs over the inputs + * using max-min fair sharing. + * <p> + * The max-min fair sharing algorithm of this multiplexer ensures that each input receives a fair share of the combined + * output capacity, but allows individual inputs to use more capacity if there is still capacity left. + */ +public final class MaxMinFlowMultiplexer implements FlowMultiplexer, FlowStageLogic { + /** + * Factory implementation for this implementation. + */ + static FlowMultiplexerFactory FACTORY = MaxMinFlowMultiplexer::new; + + private final FlowStage stage; + private final BitSet activeInputs; + private final BitSet activeOutputs; + + private float capacity = 0.f; + private float demand = 0.f; + private float rate = 0.f; + + private InPort[] inlets; + private long[] inputs; + private float[] rates; + private OutPort[] outlets; + + private final MultiplexerInHandler inHandler = new MultiplexerInHandler(); + private final MultiplexerOutHandler outHandler = new MultiplexerOutHandler(); + + /** + * Construct a {@link MaxMinFlowMultiplexer} instance. + * + * @param graph The {@link FlowGraph} to add the multiplexer to. + */ + public MaxMinFlowMultiplexer(FlowGraph graph) { + this.stage = graph.newStage(this); + this.activeInputs = new BitSet(); + this.activeOutputs = new BitSet(); + + this.inlets = new InPort[4]; + this.inputs = new long[4]; + this.rates = new float[4]; + this.outlets = new OutPort[4]; + } + + @Override + public float getCapacity() { + return capacity; + } + + @Override + public float getDemand() { + return demand; + } + + @Override + public float getRate() { + return rate; + } + + @Override + public int getMaxInputs() { + return Integer.MAX_VALUE; + } + + @Override + public int getMaxOutputs() { + return Integer.MAX_VALUE; + } + + @Override + public long onUpdate(FlowStage ctx, long now) { + float capacity = this.capacity; + float demand = this.demand; + float rate = demand; + + if (demand > capacity) { + rate = redistributeCapacity(inlets, inputs, rates, capacity); + } + + if (this.rate != rate) { + // Only update the outputs if the output rate has changed + this.rate = rate; + + changeRate(activeOutputs, outlets, capacity, rate); + } + + return Long.MAX_VALUE; + } + + @Override + public int getInputCount() { + return activeInputs.length(); + } + + @Override + public Inlet newInput() { + final BitSet activeInputs = this.activeInputs; + int slot = activeInputs.nextClearBit(0); + + InPort port = stage.getInlet("in" + slot); + port.setHandler(inHandler); + port.pull(this.capacity); + + InPort[] inlets = this.inlets; + if (slot >= inlets.length) { + int newLength = inlets.length + (inlets.length >> 1); + inlets = Arrays.copyOf(inlets, newLength); + inputs = Arrays.copyOf(inputs, newLength); + rates = Arrays.copyOf(rates, newLength); + this.inlets = inlets; + } + inlets[slot] = port; + + activeInputs.set(slot); + return port; + } + + @Override + public void releaseInput(Inlet inlet) { + InPort port = (InPort) inlet; + + activeInputs.clear(port.getId()); + port.cancel(null); + } + + @Override + public int getOutputCount() { + return activeOutputs.length(); + } + + @Override + public Outlet newOutput() { + final BitSet activeOutputs = this.activeOutputs; + int slot = activeOutputs.nextClearBit(0); + + OutPort port = stage.getOutlet("out" + slot); + port.setHandler(outHandler); + + OutPort[] outlets = this.outlets; + if (slot >= outlets.length) { + int newLength = outlets.length + (outlets.length >> 1); + outlets = Arrays.copyOf(outlets, newLength); + this.outlets = outlets; + } + outlets[slot] = port; + + activeOutputs.set(slot); + return port; + } + + @Override + public void releaseOutput(Outlet outlet) { + OutPort port = (OutPort) outlet; + activeInputs.clear(port.getId()); + port.complete(); + } + + /** + * Helper function to redistribute the specified capacity across the inlets. + */ + private static float redistributeCapacity(InPort[] inlets, long[] inputs, float[] rates, float capacity) { + // If the demand is higher than the capacity, we need use max-min fair sharing to distribute the + // constrained capacity across the inputs. + for (int i = 0; i < inputs.length; i++) { + InPort inlet = inlets[i]; + if (inlet == null) { + break; + } + + inputs[i] = ((long) Float.floatToRawIntBits(inlet.getDemand()) << 32) | (i & 0xFFFFFFFFL); + } + Arrays.sort(inputs); + + float availableCapacity = capacity; + int inputSize = inputs.length; + + // Divide the available output capacity fairly over the inputs using max-min fair sharing + for (int i = 0; i < inputs.length; i++) { + long v = inputs[i]; + int slot = (int) v; + float d = Float.intBitsToFloat((int) (v >> 32)); + + if (d == 0.0) { + continue; + } + + float availableShare = availableCapacity / (inputSize - i); + float r = Math.min(d, availableShare); + + rates[slot] = r; + availableCapacity -= r; + } + + return capacity - availableCapacity; + } + + /** + * Helper method to change the rate of the outlets. + */ + private static void changeRate(BitSet activeOutputs, OutPort[] outlets, float capacity, float rate) { + // Divide the requests over the available capacity of the input resources fairly + for (int i = activeOutputs.nextSetBit(0); i != -1; i = activeOutputs.nextSetBit(i + 1)) { + OutPort outlet = outlets[i]; + float fraction = outlet.getCapacity() / capacity; + outlet.push(rate * fraction); + } + } + + /** + * A {@link InHandler} implementation for the multiplexer inputs. + */ + private class MultiplexerInHandler implements InHandler { + @Override + public float getRate(InPort port) { + return rates[port.getId()]; + } + + @Override + public void onPush(InPort port, float demand) { + MaxMinFlowMultiplexer.this.demand += -port.getDemand() + demand; + rates[port.getId()] = demand; + } + + @Override + public void onUpstreamFinish(InPort port, Throwable cause) { + MaxMinFlowMultiplexer.this.demand -= port.getDemand(); + releaseInput(port); + rates[port.getId()] = 0.f; + } + } + + /** + * A {@link OutHandler} implementation for the multiplexer outputs. + */ + private class MultiplexerOutHandler implements OutHandler { + @Override + public void onPull(OutPort port, float capacity) { + float newCapacity = MaxMinFlowMultiplexer.this.capacity - port.getCapacity() + capacity; + MaxMinFlowMultiplexer.this.capacity = newCapacity; + changeInletCapacity(newCapacity); + } + + @Override + public void onDownstreamFinish(OutPort port, Throwable cause) { + float newCapacity = MaxMinFlowMultiplexer.this.capacity - port.getCapacity(); + MaxMinFlowMultiplexer.this.capacity = newCapacity; + releaseOutput(port); + changeInletCapacity(newCapacity); + } + + private void changeInletCapacity(float capacity) { + BitSet activeInputs = MaxMinFlowMultiplexer.this.activeInputs; + InPort[] inlets = MaxMinFlowMultiplexer.this.inlets; + + for (int i = activeInputs.nextSetBit(0); i != -1; i = activeInputs.nextSetBit(i + 1)) { + inlets[i].pull(capacity); + } + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/FlowSink.java index 450195ec..69c94708 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/FlowSink.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,9 +20,17 @@ * SOFTWARE. */ -package org.opendc.simulator.flow.internal +package org.opendc.simulator.flow2.sink; + +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.Inlet; /** - * Constant for converting milliseconds into seconds. + * A {@link FlowStage} with a single input. */ -internal const val D_MS_TO_S = 1 / 1000.0 +public interface FlowSink { + /** + * Return the input of this {@link FlowSink}. + */ + Inlet getInput(); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/SimpleFlowSink.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/SimpleFlowSink.java new file mode 100644 index 00000000..fdfe5ee8 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/sink/SimpleFlowSink.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.sink; + +import org.opendc.simulator.flow2.FlowGraph; +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.FlowStageLogic; +import org.opendc.simulator.flow2.InHandler; +import org.opendc.simulator.flow2.InPort; +import org.opendc.simulator.flow2.Inlet; + +/** + * A sink with a fixed capacity. + */ +public final class SimpleFlowSink implements FlowSink, FlowStageLogic { + private final FlowStage stage; + private final InPort input; + private final Handler handler; + + /** + * Construct a new {@link SimpleFlowSink} with the specified initial capacity. + * + * @param graph The graph to add the sink to. + * @param initialCapacity The initial capacity of the sink. + */ + public SimpleFlowSink(FlowGraph graph, float initialCapacity) { + this.stage = graph.newStage(this); + this.handler = new Handler(); + this.input = stage.getInlet("in"); + this.input.pull(initialCapacity); + this.input.setMask(true); + this.input.setHandler(handler); + } + + /** + * Return the {@link Inlet} of this sink. + */ + @Override + public Inlet getInput() { + return input; + } + + /** + * Return the capacity of the sink. + */ + public float getCapacity() { + return input.getCapacity(); + } + + /** + * Update the capacity of the sink. + * + * @param capacity The new capacity to update the sink to. + */ + public void setCapacity(float capacity) { + input.pull(capacity); + stage.invalidate(); + } + + /** + * Return the flow rate of the sink. + */ + public float getRate() { + return input.getRate(); + } + + /** + * Remove this node from the graph. + */ + public void close() { + stage.close(); + } + + @Override + public long onUpdate(FlowStage ctx, long now) { + InPort input = this.input; + handler.rate = Math.min(input.getDemand(), input.getCapacity()); + return Long.MAX_VALUE; + } + + /** + * The {@link InHandler} implementation for the sink. + */ + private static final class Handler implements InHandler { + float rate; + + @Override + public float getRate(InPort port) { + return rate; + } + + @Override + public void onPush(InPort port, float demand) { + float capacity = port.getCapacity(); + rate = Math.min(demand, capacity); + } + + @Override + public void onUpstreamFinish(InPort port, Throwable cause) { + rate = 0.f; + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/EmptyFlowSource.java index 8ff0bc76..2dcc66e4 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/EmptyFlowSource.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,53 +20,46 @@ * SOFTWARE. */ -package org.opendc.simulator.flow +package org.opendc.simulator.flow2.source; + +import org.opendc.simulator.flow2.FlowGraph; +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.FlowStageLogic; +import org.opendc.simulator.flow2.OutPort; +import org.opendc.simulator.flow2.Outlet; /** - * An active connection between a [FlowSource] and [FlowConsumer]. + * An empty {@link FlowSource}. */ -public interface FlowConnection : AutoCloseable { - /** - * The capacity of the connection. - */ - public val capacity: Double - - /** - * The flow rate over the connection. - */ - public val rate: Double - - /** - * The flow demand of the source. - */ - public val demand: Double +public final class EmptyFlowSource implements FlowSource, FlowStageLogic { + private final FlowStage stage; + private final OutPort output; /** - * A flag to control whether [FlowSource.onConverge] should be invoked for this source. + * Construct a new {@link EmptyFlowSource}. */ - public var shouldSourceConverge: Boolean + public EmptyFlowSource(FlowGraph graph) { + this.stage = graph.newStage(this); + this.output = stage.getOutlet("out"); + } /** - * Pull the source. + * Return the {@link Outlet} of the source. */ - public fun pull() + @Override + public Outlet getOutput() { + return output; + } /** - * Pull the source. - * - * @param now The timestamp at which the connection is pulled. + * Remove this node from the graph. */ - public fun pull(now: Long) + public void close() { + stage.close(); + } - /** - * Push the given flow [rate] over this connection. - * - * @param rate The rate of the flow to push. - */ - public fun push(rate: Double) - - /** - * Disconnect the consumer from its source. - */ - public override fun close() + @Override + public long onUpdate(FlowStage ctx, long now) { + return Long.MAX_VALUE; + } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/FlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/FlowSource.java new file mode 100644 index 00000000..f9432c33 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/FlowSource.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.source; + +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.Outlet; + +/** + * A {@link FlowStage} with a single output. + */ +public interface FlowSource { + /** + * Return the output of this {@link FlowSource}. + */ + Outlet getOutput(); +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java new file mode 100644 index 00000000..c09987cd --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/RuntimeFlowSource.java @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.source; + +import java.util.function.Consumer; +import org.opendc.simulator.flow2.FlowGraph; +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.FlowStageLogic; +import org.opendc.simulator.flow2.OutHandler; +import org.opendc.simulator.flow2.OutPort; +import org.opendc.simulator.flow2.Outlet; + +/** + * A {@link FlowSource} that ensures a flow is emitted for a specified amount of time at some utilization. + */ +public class RuntimeFlowSource implements FlowSource, FlowStageLogic { + private final float utilization; + + private final FlowStage stage; + private final OutPort output; + private final Consumer<RuntimeFlowSource> completionHandler; + + private long duration; + private long lastPull; + + /** + * Construct a {@link RuntimeFlowSource} instance. + * + * @param graph The {@link FlowGraph} to which this source belongs. + * @param duration The duration of the source. + * @param utilization The utilization of the capacity of the outlet. + * @param completionHandler A callback invoked when the source completes. + */ + public RuntimeFlowSource( + FlowGraph graph, long duration, float utilization, Consumer<RuntimeFlowSource> completionHandler) { + if (duration <= 0) { + throw new IllegalArgumentException("Duration must be positive and non-zero"); + } + + if (utilization <= 0.0) { + throw new IllegalArgumentException("Utilization must be positive and non-zero"); + } + + this.stage = graph.newStage(this); + this.output = stage.getOutlet("out"); + this.output.setHandler(new OutHandler() { + @Override + public void onPull(OutPort port, float capacity) {} + + @Override + public void onDownstreamFinish(OutPort port, Throwable cause) { + // Source cannot complete without re-connecting to another sink, so mark the source as completed + completionHandler.accept(RuntimeFlowSource.this); + } + }); + this.duration = duration; + this.utilization = utilization; + this.completionHandler = completionHandler; + this.lastPull = graph.getEngine().getClock().millis(); + } + + /** + * Construct a new {@link RuntimeFlowSource}. + * + * @param graph The {@link FlowGraph} to which this source belongs. + * @param duration The duration of the source. + * @param utilization The utilization of the capacity of the outlet. + */ + public RuntimeFlowSource(FlowGraph graph, long duration, float utilization) { + this(graph, duration, utilization, RuntimeFlowSource::close); + } + + /** + * Return the {@link Outlet} of the source. + */ + @Override + public Outlet getOutput() { + return output; + } + + /** + * Remove this node from the graph. + */ + public void close() { + stage.close(); + } + + @Override + public long onUpdate(FlowStage ctx, long now) { + long lastPull = this.lastPull; + this.lastPull = now; + + long delta = Math.max(0, now - lastPull); + + OutPort output = this.output; + float limit = output.getCapacity() * utilization; + long duration = this.duration - delta; + + if (duration <= 0) { + completionHandler.accept(this); + return Long.MAX_VALUE; + } + + this.duration = duration; + output.push(limit); + return now + duration; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java new file mode 100644 index 00000000..a0e9cb9d --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/SimpleFlowSource.java @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.source; + +import java.util.function.Consumer; +import org.opendc.simulator.flow2.FlowGraph; +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.FlowStageLogic; +import org.opendc.simulator.flow2.OutHandler; +import org.opendc.simulator.flow2.OutPort; +import org.opendc.simulator.flow2.Outlet; + +/** + * A flow source that contains a fixed amount and is pushed with a given utilization. + */ +public final class SimpleFlowSource implements FlowSource, FlowStageLogic { + private final float utilization; + private float remainingAmount; + private long lastPull; + + private final FlowStage stage; + private final OutPort output; + private final Consumer<SimpleFlowSource> completionHandler; + + /** + * Construct a new {@link SimpleFlowSource}. + * + * @param graph The {@link FlowGraph} to which this source belongs. + * @param amount The amount to transfer via the outlet. + * @param utilization The utilization of the capacity of the outlet. + * @param completionHandler A callback invoked when the source completes. + */ + public SimpleFlowSource( + FlowGraph graph, float amount, float utilization, Consumer<SimpleFlowSource> completionHandler) { + if (amount < 0.0) { + throw new IllegalArgumentException("Amount must be non-negative"); + } + + if (utilization <= 0.0) { + throw new IllegalArgumentException("Utilization must be positive and non-zero"); + } + + this.stage = graph.newStage(this); + this.output = stage.getOutlet("out"); + this.output.setHandler(new OutHandler() { + @Override + public void onPull(OutPort port, float capacity) {} + + @Override + public void onDownstreamFinish(OutPort port, Throwable cause) { + // Source cannot complete without re-connecting to another sink, so mark the source as completed + completionHandler.accept(SimpleFlowSource.this); + } + }); + this.completionHandler = completionHandler; + this.utilization = utilization; + this.remainingAmount = amount; + this.lastPull = graph.getEngine().getClock().millis(); + } + + /** + * Construct a new {@link SimpleFlowSource}. + * + * @param graph The {@link FlowGraph} to which this source belongs. + * @param amount The amount to transfer via the outlet. + * @param utilization The utilization of the capacity of the outlet. + */ + public SimpleFlowSource(FlowGraph graph, float amount, float utilization) { + this(graph, amount, utilization, SimpleFlowSource::close); + } + + /** + * Return the {@link Outlet} of the source. + */ + @Override + public Outlet getOutput() { + return output; + } + + /** + * Remove this node from the graph. + */ + public void close() { + stage.close(); + } + + @Override + public long onUpdate(FlowStage ctx, long now) { + long lastPull = this.lastPull; + this.lastPull = now; + + long delta = Math.max(0, now - lastPull); + + OutPort output = this.output; + float consumed = output.getRate() * delta / 1000.f; + float limit = output.getCapacity() * utilization; + + float remainingAmount = this.remainingAmount - consumed; + this.remainingAmount = remainingAmount; + + long duration = (long) Math.ceil(remainingAmount / limit * 1000); + + if (duration <= 0) { + completionHandler.accept(this); + return Long.MAX_VALUE; + } + + output.push(limit); + return now + duration; + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java new file mode 100644 index 00000000..e8abc2d7 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/source/TraceFlowSource.java @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.source; + +import java.util.function.Consumer; +import org.opendc.simulator.flow2.FlowGraph; +import org.opendc.simulator.flow2.FlowStage; +import org.opendc.simulator.flow2.FlowStageLogic; +import org.opendc.simulator.flow2.OutHandler; +import org.opendc.simulator.flow2.OutPort; +import org.opendc.simulator.flow2.Outlet; + +/** + * A flow source that replays a sequence of fragments, each indicating the flow rate for some period of time. + */ +public final class TraceFlowSource implements FlowSource, FlowStageLogic { + private final OutPort output; + private final long[] deadlines; + private final float[] usages; + private final int size; + private int index; + + private final FlowStage stage; + private final Consumer<TraceFlowSource> completionHandler; + + /** + * Construct a {@link TraceFlowSource}. + * + * @param graph The {@link FlowGraph} to which the source belongs. + * @param trace The {@link Trace} to replay. + * @param completionHandler The completion handler to invoke when the source finishes. + */ + public TraceFlowSource(FlowGraph graph, Trace trace, Consumer<TraceFlowSource> completionHandler) { + this.stage = graph.newStage(this); + this.output = stage.getOutlet("out"); + this.output.setHandler(new OutHandler() { + @Override + public void onPull(OutPort port, float capacity) {} + + @Override + public void onDownstreamFinish(OutPort port, Throwable cause) { + // Source cannot complete without re-connecting to another sink, so mark the source as completed + completionHandler.accept(TraceFlowSource.this); + } + }); + this.deadlines = trace.deadlines; + this.usages = trace.usages; + this.size = trace.size; + this.completionHandler = completionHandler; + } + + /** + * Construct a {@link TraceFlowSource}. + * + * @param graph The {@link FlowGraph} to which the source belongs. + * @param trace The {@link Trace} to replay. + */ + public TraceFlowSource(FlowGraph graph, Trace trace) { + this(graph, trace, TraceFlowSource::close); + } + + @Override + public Outlet getOutput() { + return output; + } + + /** + * Remove this node from the graph. + */ + public void close() { + stage.close(); + } + + @Override + public long onUpdate(FlowStage ctx, long now) { + int size = this.size; + int index = this.index; + long[] deadlines = this.deadlines; + long deadline; + + do { + deadline = deadlines[index]; + } while (deadline <= now && ++index < size); + + if (index >= size) { + output.push(0.0f); + completionHandler.accept(this); + return Long.MAX_VALUE; + } + + this.index = index; + float usage = usages[index]; + output.push(usage); + + return deadline; + } + + /** + * A trace describes the workload over time. + */ + public static final class Trace { + private final long[] deadlines; + private final float[] usages; + private final int size; + + /** + * Construct a {@link Trace}. + * + * @param deadlines The deadlines of the trace fragments. + * @param usages The usages of the trace fragments. + * @param size The size of the trace. + */ + public Trace(long[] deadlines, float[] usages, int size) { + this.deadlines = deadlines; + this.usages = usages; + this.size = size; + } + + public long[] getDeadlines() { + return deadlines; + } + + public float[] getUsages() { + return usages; + } + + public int getSize() { + return size; + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransform.java index d8ad7978..51ea7df3 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransform.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,29 +20,22 @@ * SOFTWARE. */ -package org.opendc.simulator.flow +package org.opendc.simulator.flow2.util; + +import org.opendc.simulator.flow2.FlowGraph; /** - * An interface that tracks cumulative counts of the flow accumulation over a stage. + * A {@link FlowTransform} describes a transformation between two components in a {@link FlowGraph} that might operate + * at different units of flow. */ -public interface FlowCounters { - /** - * The accumulated flow that a source wanted to push over the connection. - */ - public val demand: Double - - /** - * The accumulated flow that was actually transferred over the connection. - */ - public val actual: Double - +public interface FlowTransform { /** - * The amount of capacity that was not utilized. + * Apply the transform to the specified flow rate. */ - public val remaining: Double + float apply(float value); /** - * Reset the flow counters. + * Apply the inverse of the transformation to the specified flow rate. */ - public fun reset() + float applyInverse(float value); } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransformer.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransformer.java new file mode 100644 index 00000000..852240d8 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransformer.java @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.util; + +import org.opendc.simulator.flow2.*; +import org.opendc.simulator.flow2.sink.FlowSink; +import org.opendc.simulator.flow2.source.FlowSource; + +/** + * Helper class to transform flow from outlet to inlet. + */ +public final class FlowTransformer implements FlowStageLogic, FlowSource, FlowSink { + private final FlowStage stage; + private final InPort input; + private final OutPort output; + + /** + * Construct a new {@link FlowTransformer}. + */ + public FlowTransformer(FlowGraph graph, FlowTransform transform) { + this.stage = graph.newStage(this); + this.input = stage.getInlet("in"); + this.output = stage.getOutlet("out"); + + this.input.setHandler(new ForwardInHandler(output, transform)); + this.input.setMask(true); + this.output.setHandler(new ForwardOutHandler(input, transform)); + this.output.setMask(true); + } + + /** + * Return the {@link Outlet} of the transformer. + */ + @Override + public Outlet getOutput() { + return output; + } + + /** + * Return the {@link Inlet} of the transformer. + */ + @Override + public Inlet getInput() { + return input; + } + + /** + * Close the transformer. + */ + void close() { + stage.close(); + } + + @Override + public long onUpdate(FlowStage ctx, long now) { + return Long.MAX_VALUE; + } + + private static class ForwardInHandler implements InHandler { + private final OutPort output; + private final FlowTransform transform; + + ForwardInHandler(OutPort output, FlowTransform transform) { + this.output = output; + this.transform = transform; + } + + @Override + public float getRate(InPort port) { + return transform.applyInverse(output.getRate()); + } + + @Override + public void onPush(InPort port, float demand) { + float rate = transform.apply(demand); + output.push(rate); + } + + @Override + public void onUpstreamFinish(InPort port, Throwable cause) { + output.fail(cause); + } + } + + private static class ForwardOutHandler implements OutHandler { + private final InPort input; + private final FlowTransform transform; + + ForwardOutHandler(InPort input, FlowTransform transform) { + this.input = input; + this.transform = transform; + } + + @Override + public void onPull(OutPort port, float capacity) { + input.pull(transform.applyInverse(capacity)); + } + + @Override + public void onDownstreamFinish(OutPort port, Throwable cause) { + input.cancel(cause); + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransforms.java index c320a362..428dbfca 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/util/FlowTransforms.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,34 +20,38 @@ * SOFTWARE. */ -package org.opendc.simulator.flow.internal - -import org.opendc.simulator.flow.FlowCounters +package org.opendc.simulator.flow2.util; /** - * Mutable implementation of the [FlowCounters] interface. + * A collection of common {@link FlowTransform} implementations. */ -public class MutableFlowCounters : FlowCounters { - override val demand: Double - get() = _counters[0] - override val actual: Double - get() = _counters[1] - override val remaining: Double - get() = _counters[2] - private val _counters = DoubleArray(3) +public class FlowTransforms { + /** + * Prevent construction of this class. + */ + private FlowTransforms() {} - override fun reset() { - _counters.fill(0.0) + /** + * Return a {@link FlowTransform} that forwards the flow rate unmodified. + */ + public static FlowTransform noop() { + return NoopFlowTransform.INSTANCE; } - public fun increment(demand: Double, actual: Double, remaining: Double) { - val counters = _counters - counters[0] += demand - counters[1] += actual - counters[2] += remaining - } + /** + * No-op implementation of a {@link FlowTransform}. + */ + private static final class NoopFlowTransform implements FlowTransform { + static final NoopFlowTransform INSTANCE = new NoopFlowTransform(); + + @Override + public float apply(float value) { + return value; + } - override fun toString(): String { - return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining]" + @Override + public float applyInverse(float value) { + return value; + } } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt deleted file mode 100644 index a49826f4..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -import kotlinx.coroutines.suspendCancellableCoroutine -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException - -/** - * A consumer of a [FlowSource]. - */ -public interface FlowConsumer { - /** - * A flag to indicate that the consumer is currently consuming a [FlowSource]. - */ - public val isActive: Boolean - - /** - * The flow capacity of this consumer. - */ - public val capacity: Double - - /** - * The current flow rate of the consumer. - */ - public val rate: Double - - /** - * The current flow demand. - */ - public val demand: Double - - /** - * The flow counters to track the flow metrics of the consumer. - */ - public val counters: FlowCounters - - /** - * Start consuming the specified [source]. - * - * @throws IllegalStateException if the consumer is already active. - */ - public fun startConsumer(source: FlowSource) - - /** - * Ask the consumer to pull its source. - * - * If the consumer is not active, this operation will be a no-op. - */ - public fun pull() - - /** - * Disconnect the consumer from its source. - * - * If the consumer is not active, this operation will be a no-op. - */ - public fun cancel() -} - -/** - * Consume the specified [source] and suspend execution until the source is fully consumed or failed. - */ -public suspend fun FlowConsumer.consume(source: FlowSource) { - return suspendCancellableCoroutine { cont -> - startConsumer(object : FlowSource { - override fun onStart(conn: FlowConnection, now: Long) { - try { - source.onStart(conn, now) - } catch (cause: Throwable) { - cont.resumeWithException(cause) - throw cause - } - } - - override fun onStop(conn: FlowConnection, now: Long) { - try { - source.onStop(conn, now) - - if (!cont.isCompleted) { - cont.resume(Unit) - } - } catch (cause: Throwable) { - cont.resumeWithException(cause) - throw cause - } - } - - override fun onPull(conn: FlowConnection, now: Long): Long { - return try { - source.onPull(conn, now) - } catch (cause: Throwable) { - cont.resumeWithException(cause) - throw cause - } - } - - override fun onConverge(conn: FlowConnection, now: Long) { - try { - source.onConverge(conn, now) - } catch (cause: Throwable) { - cont.resumeWithException(cause) - throw cause - } - } - - override fun toString(): String = "SuspendingFlowSource" - }) - - cont.invokeOnCancellation { cancel() } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt deleted file mode 100644 index 1d3adb10..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -/** - * A collection of callbacks associated with a [FlowConsumer]. - */ -public interface FlowConsumerLogic { - /** - * This method is invoked when a [FlowSource] changes the rate of flow to this consumer. - * - * @param ctx The context in which the provider runs. - * @param now The virtual timestamp in milliseconds at which the update is occurring. - * @param rate The requested processing rate of the source. - */ - public fun onPush(ctx: FlowConsumerContext, now: Long, rate: Double) {} - - /** - * This method is invoked when the flow graph has converged into a steady-state system. - * - * Make sure to enable [FlowConsumerContext.shouldSourceConverge] if you need this callback. By default, this method - * will not be invoked. - * - * @param ctx The context in which the provider runs. - * @param now The virtual timestamp in milliseconds at which the system converged. - */ - public fun onConverge(ctx: FlowConsumerContext, now: Long) {} - - /** - * This method is invoked when the [FlowSource] completed or failed. - * - * @param ctx The context in which the provider runs. - * @param now The virtual timestamp in milliseconds at which the provider finished. - * @param cause The cause of the failure or `null` if the source completed. - */ - public fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) {} -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt deleted file mode 100644 index 65224827..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -import org.opendc.simulator.flow.internal.FlowEngineImpl -import java.time.Clock -import kotlin.coroutines.CoroutineContext - -/** - * A [FlowEngine] is responsible for managing the interaction between [FlowSource]s and [FlowConsumer]s. - * - * The engine centralizes the scheduling logic of state updates of flow connections, allowing update propagation - * to happen more efficiently. and overall, reducing the work necessary to transition into a steady state. - */ -public interface FlowEngine { - /** - * The virtual [Clock] associated with this engine. - */ - public val clock: Clock - - /** - * Create a new [FlowConsumerContext] with the given [provider]. - * - * @param consumer The consumer logic. - * @param provider The logic of the resource provider. - */ - public fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext - - /** - * Start batching the execution of resource updates until [popBatch] is called. - * - * This method is useful if you want to propagate multiple resources updates (e.g., starting multiple CPUs - * simultaneously) in a single state update. - * - * Multiple calls to this method requires the same number of [popBatch] calls in order to properly flush the - * resource updates. This allows nested calls to [pushBatch], but might cause issues if [popBatch] is not called - * the same amount of times. To simplify batching, see [batch]. - */ - public fun pushBatch() - - /** - * Stop the batching of resource updates and run the interpreter on the batch. - * - * Note that method will only flush the event once the first call to [pushBatch] has received a [popBatch] call. - */ - public fun popBatch() - - public companion object { - /** - * Construct a new [FlowEngine] implementation. - * - * @param context The coroutine context to use. - * @param clock The virtual simulation clock. - */ - @JvmStatic - @JvmName("create") - public operator fun invoke(context: CoroutineContext, clock: Clock): FlowEngine { - return FlowEngineImpl(context, clock) - } - } -} - -/** - * Batch the execution of several interrupts into a single call. - * - * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update. - */ -public inline fun FlowEngine.batch(block: () -> Unit) { - try { - pushBatch() - block() - } finally { - popBatch() - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt deleted file mode 100644 index 5202c252..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -import mu.KotlinLogging -import org.opendc.simulator.flow.internal.D_MS_TO_S -import org.opendc.simulator.flow.internal.MutableFlowCounters - -/** - * The logging instance of this connection. - */ -private val logger = KotlinLogging.logger {} - -/** - * A class that acts as a [FlowSource] and [FlowConsumer] at the same time. - * - * @param engine The [FlowEngine] the forwarder runs in. - * @param listener The convergence lister to use. - * @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits. - */ -public class FlowForwarder( - private val engine: FlowEngine, - private val listener: FlowConvergenceListener? = null, - private val isCoupled: Boolean = false -) : FlowSource, FlowConsumer, AutoCloseable { - /** - * The delegate [FlowSource]. - */ - private var delegate: FlowSource? = null - - /** - * A flag to indicate that the delegate was started. - */ - private var hasDelegateStarted: Boolean = false - - /** - * The exposed [FlowConnection]. - */ - private val _ctx = object : FlowConnection { - override var shouldSourceConverge: Boolean = false - set(value) { - field = value - _innerCtx?.shouldSourceConverge = value - } - - override val capacity: Double - get() = _innerCtx?.capacity ?: 0.0 - - override val demand: Double - get() = _innerCtx?.demand ?: 0.0 - - override val rate: Double - get() = _innerCtx?.rate ?: 0.0 - - override fun pull() { - _innerCtx?.pull() - } - - override fun pull(now: Long) { - _innerCtx?.pull(now) - } - - override fun push(rate: Double) { - if (delegate == null) { - return - } - - _innerCtx?.push(rate) - _demand = rate - } - - override fun close() { - val delegate = delegate ?: return - val hasDelegateStarted = hasDelegateStarted - - // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we - // reset beforehand the existing state and check whether it has been updated afterwards - reset() - - if (hasDelegateStarted) { - val now = engine.clock.millis() - delegate.onStop(this, now) - } - } - } - - /** - * The [FlowConnection] in which the forwarder runs. - */ - private var _innerCtx: FlowConnection? = null - - override val isActive: Boolean - get() = delegate != null - - override val capacity: Double - get() = _ctx.capacity - - override val rate: Double - get() = _ctx.rate - - override val demand: Double - get() = _ctx.demand - - override val counters: FlowCounters - get() = _counters - private val _counters = MutableFlowCounters() - - override fun startConsumer(source: FlowSource) { - check(delegate == null) { "Forwarder already active" } - - delegate = source - - // Pull to replace the source - pull() - } - - override fun pull() { - _ctx.pull() - } - - override fun cancel() { - _ctx.close() - } - - override fun close() { - val ctx = _innerCtx - - if (ctx != null) { - this._innerCtx = null - ctx.pull() - } - } - - override fun onStart(conn: FlowConnection, now: Long) { - _innerCtx = conn - - if (listener != null || _ctx.shouldSourceConverge) { - conn.shouldSourceConverge = true - } - } - - override fun onStop(conn: FlowConnection, now: Long) { - _innerCtx = null - - val delegate = delegate - if (delegate != null) { - reset() - - try { - delegate.onStop(this._ctx, now) - } catch (cause: Throwable) { - logger.error(cause) { "Uncaught exception" } - } - } - } - - override fun onPull(conn: FlowConnection, now: Long): Long { - val delegate = delegate - - if (!hasDelegateStarted) { - start() - } - - updateCounters(conn, now) - - return try { - delegate?.onPull(_ctx, now) ?: Long.MAX_VALUE - } catch (cause: Throwable) { - logger.error(cause) { "Uncaught exception" } - - reset() - Long.MAX_VALUE - } - } - - override fun onConverge(conn: FlowConnection, now: Long) { - try { - delegate?.onConverge(this._ctx, now) - listener?.onConverge(now) - } catch (cause: Throwable) { - logger.error(cause) { "Uncaught exception" } - - _innerCtx = null - reset() - } - } - - /** - * Start the delegate. - */ - private fun start() { - val delegate = delegate ?: return - - try { - val now = engine.clock.millis() - delegate.onStart(_ctx, now) - hasDelegateStarted = true - _lastUpdate = now - } catch (cause: Throwable) { - logger.error(cause) { "Uncaught exception" } - reset() - } - } - - /** - * Reset the delegate. - */ - private fun reset() { - if (isCoupled) { - _innerCtx?.close() - } else { - _innerCtx?.push(0.0) - } - - delegate = null - hasDelegateStarted = false - } - - /** - * The requested flow rate. - */ - private var _demand: Double = 0.0 - private var _lastUpdate = 0L - - /** - * Update the flow counters for the transformer. - */ - private fun updateCounters(ctx: FlowConnection, now: Long) { - val lastUpdate = _lastUpdate - _lastUpdate = now - val delta = now - lastUpdate - if (delta <= 0) { - return - } - - val counters = _counters - val deltaS = delta * D_MS_TO_S - val total = ctx.capacity * deltaS - val work = _demand * deltaS - val actualWork = ctx.rate * deltaS - - counters.increment(work, actualWork, (total - actualWork)) - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt deleted file mode 100644 index af702701..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -/** - * A [FlowConsumer] that maps the pushed flow through [transform]. - * - * @param source The source of the flow. - * @param transform The method to transform the flow. - */ -public class FlowMapper( - private val source: FlowSource, - private val transform: (FlowConnection, Double) -> Double -) : FlowSource { - - /** - * The current active connection. - */ - private var _conn: Connection? = null - - override fun onStart(conn: FlowConnection, now: Long) { - check(_conn == null) { "Concurrent access" } - val delegate = Connection(conn, transform) - _conn = delegate - source.onStart(delegate, now) - } - - override fun onStop(conn: FlowConnection, now: Long) { - val delegate = checkNotNull(_conn) { "Invariant violation" } - _conn = null - source.onStop(delegate, now) - } - - override fun onPull(conn: FlowConnection, now: Long): Long { - val delegate = checkNotNull(_conn) { "Invariant violation" } - return source.onPull(delegate, now) - } - - override fun onConverge(conn: FlowConnection, now: Long) { - val delegate = _conn ?: return - source.onConverge(delegate, now) - } - - /** - * The wrapper [FlowConnection] that is used to transform the flow. - */ - private class Connection( - private val delegate: FlowConnection, - private val transform: (FlowConnection, Double) -> Double - ) : FlowConnection by delegate { - override fun push(rate: Double) { - delegate.push(transform(this, rate)) - } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt deleted file mode 100644 index ee8cd739..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -import org.opendc.simulator.flow.internal.D_MS_TO_S -import org.opendc.simulator.flow.internal.MutableFlowCounters - -/** - * A [FlowSink] represents a sink with a fixed capacity. - * - * @param initialCapacity The initial capacity of the resource. - * @param engine The engine that is used for driving the flow simulation. - * @param parent The parent flow system. - */ -public class FlowSink( - private val engine: FlowEngine, - initialCapacity: Double, - private val parent: FlowConvergenceListener? = null -) : FlowConsumer { - /** - * A flag to indicate that the flow consumer is active. - */ - public override val isActive: Boolean - get() = _ctx != null - - /** - * The capacity of the consumer. - */ - public override var capacity: Double = initialCapacity - set(value) { - field = value - _ctx?.capacity = value - } - - /** - * The current processing rate of the consumer. - */ - public override val rate: Double - get() = _ctx?.rate ?: 0.0 - - /** - * The flow processing rate demand at this instant. - */ - public override val demand: Double - get() = _ctx?.demand ?: 0.0 - - /** - * The flow counters to track the flow metrics of the consumer. - */ - public override val counters: FlowCounters - get() = _counters - private val _counters = MutableFlowCounters() - - /** - * The current active [FlowConsumerLogic] of this sink. - */ - private var _ctx: FlowConsumerContext? = null - - override fun startConsumer(source: FlowSource) { - check(_ctx == null) { "Consumer is in invalid state" } - - val ctx = engine.newContext(source, Logic(parent, _counters)) - _ctx = ctx - - ctx.capacity = capacity - if (parent != null) { - ctx.shouldConsumerConverge = true - } - - ctx.start() - } - - override fun pull() { - _ctx?.pull() - } - - override fun cancel() { - _ctx?.close() - } - - override fun toString(): String = "FlowSink[capacity=$capacity]" - - /** - * [FlowConsumerLogic] of a sink. - */ - private inner class Logic(private val parent: FlowConvergenceListener?, private val counters: MutableFlowCounters) : FlowConsumerLogic { - - override fun onPush( - ctx: FlowConsumerContext, - now: Long, - rate: Double - ) { - updateCounters(ctx, now, rate, ctx.capacity) - } - - override fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) { - updateCounters(ctx, now, 0.0, 0.0) - - _ctx = null - } - - override fun onConverge(ctx: FlowConsumerContext, now: Long) { - parent?.onConverge(now) - } - - /** - * The previous demand and capacity for the consumer. - */ - private val _previous = DoubleArray(2) - private var _previousUpdate = Long.MAX_VALUE - - /** - * Update the counters of the flow consumer. - */ - private fun updateCounters(ctx: FlowConnection, now: Long, nextDemand: Double, nextCapacity: Double) { - val previousUpdate = _previousUpdate - _previousUpdate = now - val delta = now - previousUpdate - - val counters = counters - val previous = _previous - val demand = previous[0] - val capacity = previous[1] - - previous[0] = nextDemand - previous[1] = nextCapacity - - if (delta <= 0) { - return - } - - val deltaS = delta * D_MS_TO_S - val total = demand * deltaS - val work = capacity * deltaS - val actualWork = ctx.rate * deltaS - - counters.increment(work, actualWork, (total - actualWork)) - } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt deleted file mode 100644 index a48ac18e..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -/** - * A source of flow that is consumed by a [FlowConsumer]. - * - * Implementations of this interface should be considered stateful and must be assumed not to be re-usable - * (concurrently) for multiple [FlowConsumer]s, unless explicitly said otherwise. - */ -public interface FlowSource { - /** - * This method is invoked when the source is started. - * - * @param conn The connection between the source and consumer. - * @param now The virtual timestamp in milliseconds at which the provider finished. - */ - public fun onStart(conn: FlowConnection, now: Long) {} - - /** - * This method is invoked when the source is finished. - * - * @param conn The connection between the source and consumer. - * @param now The virtual timestamp in milliseconds at which the source finished. - */ - public fun onStop(conn: FlowConnection, now: Long) {} - - /** - * This method is invoked when the source is pulled. - * - * @param conn The connection between the source and consumer. - * @param now The virtual timestamp in milliseconds at which the pull is occurring. - * @return The duration after which the resource consumer should be pulled again. - */ - public fun onPull(conn: FlowConnection, now: Long): Long - - /** - * This method is invoked when the flow graph has converged into a steady-state system. - * - * Make sure to enable [FlowConnection.shouldSourceConverge] if you need this callback. By default, this method - * will not be invoked. - * - * @param conn The connection between the source and consumer. - * @param now The virtual timestamp in milliseconds at which the system converged. - */ - public fun onConverge(conn: FlowConnection, now: Long) {} -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt deleted file mode 100644 index 97d56fff..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.internal - -/** - * States of the flow connection. - */ -internal const val ConnPending = 0 // Connection is pending and the consumer is waiting to consume the source -internal const val ConnActive = 1 // Connection is active and the source is currently being consumed -internal const val ConnClosed = 2 // Connection is closed and source cannot be consumed through this connection anymore -internal const val ConnState = 0b11 // Mask for accessing the state of the flow connection - -/** - * Flags of the flow connection - */ -internal const val ConnPulled = 1 shl 2 // The source should be pulled -internal const val ConnPushed = 1 shl 3 // The source has pushed a value -internal const val ConnClose = 1 shl 4 // The connection should be closed -internal const val ConnUpdateActive = 1 shl 5 // An update for the connection is active -internal const val ConnUpdatePending = 1 shl 6 // An (immediate) update of the connection is pending -internal const val ConnConvergePending = 1 shl 7 // Indication that a convergence is already pending -internal const val ConnConvergeSource = 1 shl 8 // Enable convergence of the source -internal const val ConnConvergeConsumer = 1 shl 9 // Enable convergence of the consumer -internal const val ConnDisableTimers = 1 shl 10 // Disable timers for the source diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt deleted file mode 100644 index fba3af5f..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ /dev/null @@ -1,436 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.internal - -import mu.KotlinLogging -import org.opendc.simulator.flow.FlowConsumerContext -import org.opendc.simulator.flow.FlowConsumerLogic -import org.opendc.simulator.flow.FlowSource -import org.opendc.simulator.flow.batch -import java.util.ArrayDeque -import kotlin.math.min - -/** - * The logging instance of this connection. - */ -private val logger = KotlinLogging.logger {} - -/** - * Implementation of a [FlowConnection] managing the communication between flow sources and consumers. - */ -internal class FlowConsumerContextImpl( - private val engine: FlowEngineImpl, - private val source: FlowSource, - private val logic: FlowConsumerLogic -) : FlowConsumerContext { - /** - * The capacity of the connection. - */ - override var capacity: Double - get() = _capacity - set(value) { - val oldValue = _capacity - - // Only changes will be propagated - if (value != oldValue) { - _capacity = value - pull() - } - } - private var _capacity: Double = 0.0 - - /** - * The current processing rate of the connection. - */ - override val rate: Double - get() = _rate - private var _rate = 0.0 - - /** - * The current flow processing demand. - */ - override val demand: Double - get() = _demand - private var _demand: Double = 0.0 // The current (pending) demand of the source - - /** - * The deadline of the source. - */ - override val deadline: Long - get() = _deadline - private var _deadline: Long = Long.MAX_VALUE // The deadline of the source's timer - - /** - * Flags to control the convergence of the consumer and source. - */ - override var shouldSourceConverge: Boolean - get() = _flags and ConnConvergeSource == ConnConvergeSource - set(value) { - _flags = - if (value) { - _flags or ConnConvergeSource - } else { - _flags and ConnConvergeSource.inv() - } - } - override var shouldConsumerConverge: Boolean - get() = _flags and ConnConvergeConsumer == ConnConvergeConsumer - set(value) { - _flags = - if (value) { - _flags or ConnConvergeConsumer - } else { - _flags and ConnConvergeConsumer.inv() - } - } - - /** - * Flag to control the timers on the [FlowSource] - */ - override var enableTimers: Boolean - get() = _flags and ConnDisableTimers != ConnDisableTimers - set(value) { - _flags = - if (!value) { - _flags or ConnDisableTimers - } else { - _flags and ConnDisableTimers.inv() - } - } - - /** - * The clock to track simulation time. - */ - private val _clock = engine.clock - - /** - * The flags of the flow connection, indicating certain actions. - */ - private var _flags: Int = 0 - - /** - * The timers at which the context is scheduled to be interrupted. - */ - private var _timer: Long = Long.MAX_VALUE - private val _pendingTimers: ArrayDeque<Long> = ArrayDeque(5) - - override fun start() { - check(_flags and ConnState == ConnPending) { "Consumer is already started" } - engine.batch { - val now = _clock.millis() - source.onStart(this, now) - - // Mark the connection as active and pulled - val newFlags = (_flags and ConnState.inv()) or ConnActive or ConnPulled - scheduleImmediate(now, newFlags) - } - } - - override fun close() { - val flags = _flags - if (flags and ConnState == ConnClosed) { - return - } - - // Toggle the close bit. In case no update is active, schedule a new update. - if (flags and ConnUpdateActive == 0) { - val now = _clock.millis() - scheduleImmediate(now, flags or ConnClose) - } else { - _flags = flags or ConnClose - } - } - - override fun pull(now: Long) { - val flags = _flags - if (flags and ConnState != ConnActive) { - return - } - - // Mark connection as pulled - scheduleImmediate(now, flags or ConnPulled) - } - - override fun pull() { - pull(_clock.millis()) - } - - override fun pullSync(now: Long) { - val flags = _flags - - // Do not attempt to flush the connection if the connection is closed or an update is already active - if (flags and ConnState != ConnActive || flags and ConnUpdateActive != 0) { - return - } - - if (flags and (ConnPulled or ConnPushed) != 0 || _deadline == now) { - engine.scheduleSync(now, this) - } - } - - override fun push(rate: Double) { - if (_demand == rate) { - return - } - - _demand = rate - - val flags = _flags - - if (flags and ConnUpdateActive != 0) { - // If an update is active, it will already get picked up at the end of the update - _flags = flags or ConnPushed - } else { - // Invalidate only if no update is active - scheduleImmediate(_clock.millis(), flags or ConnPushed) - } - } - - /** - * Update the state of the flow connection. - * - * @param now The current virtual timestamp. - * @param visited The queue of connections that have been visited during the cycle. - * @param timerQueue The queue of all pending timers. - * @param isImmediate A flag to indicate that this invocation is an immediate update or a delayed update. - */ - fun doUpdate( - now: Long, - visited: FlowDeque, - timerQueue: FlowTimerQueue, - isImmediate: Boolean - ) { - var flags = _flags - - // Precondition: The flow connection must be active - if (flags and ConnState != ConnActive) { - return - } - - val deadline = _deadline - val reachedDeadline = deadline == now - var newDeadline: Long - var hasUpdated = false - - try { - // Pull the source if (1) `pull` is called or (2) the timer of the source has expired - newDeadline = if (flags and ConnPulled != 0 || reachedDeadline) { - // Update state before calling into the outside world, so it observes a consistent state - _flags = (flags and ConnPulled.inv()) or ConnUpdateActive - hasUpdated = true - - val duration = source.onPull(this, now) - - // IMPORTANT: Re-fetch the flags after the callback might have changed those - flags = _flags - - if (duration != Long.MAX_VALUE) { - now + duration - } else { - duration - } - } else { - deadline - } - - // Make the new deadline available for the consumer if it has changed - if (newDeadline != deadline) { - _deadline = newDeadline - } - - // Push to the consumer if the rate of the source has changed (after a call to `push`) - if (flags and ConnPushed != 0) { - // Update state before calling into the outside world, so it observes a consistent state - _flags = (flags and ConnPushed.inv()) or ConnUpdateActive - hasUpdated = true - - logic.onPush(this, now, _demand) - - // IMPORTANT: Re-fetch the flags after the callback might have changed those - flags = _flags - } - - // Check whether the source or consumer have tried to close the connection - if (flags and ConnClose != 0) { - hasUpdated = true - - // The source has called [FlowConnection.close], so clean up the connection - doStopSource(now) - - // IMPORTANT: Re-fetch the flags after the callback might have changed those - // We now also mark the connection as closed - flags = (_flags and ConnState.inv()) or ConnClosed - - _demand = 0.0 - newDeadline = Long.MAX_VALUE - } - } catch (cause: Throwable) { - hasUpdated = true - - // Clean up the connection - doFailSource(now, cause) - - // Mark the connection as closed - flags = (flags and ConnState.inv()) or ConnClosed - - _demand = 0.0 - newDeadline = Long.MAX_VALUE - } - - // Check whether the connection needs to be added to the visited queue. This is the case when: - // (1) An update was performed (either a push or a pull) - // (2) Either the source or consumer want to converge, and - // (3) Convergence is not already pending (ConnConvergePending) - if (hasUpdated && flags and (ConnConvergeSource or ConnConvergeConsumer) != 0 && flags and ConnConvergePending == 0) { - visited.add(this) - flags = flags or ConnConvergePending - } - - // Compute the new flow rate of the connection - // Note: _demand might be changed by [logic.onConsume], so we must re-fetch the value - _rate = min(_capacity, _demand) - - // Indicate that no update is active anymore and flush the flags - _flags = flags and ConnUpdateActive.inv() and ConnUpdatePending.inv() - - val pendingTimers = _pendingTimers - - // Prune the head timer if this is a delayed update - val timer = if (!isImmediate) { - // Invariant: Any pending timer should only point to a future timestamp - val timer = pendingTimers.poll() ?: Long.MAX_VALUE - _timer = timer - timer - } else { - _timer - } - - // Check whether we need to schedule a new timer for this connection. That is the case when: - // (1) The deadline is valid (not the maximum value) - // (2) The connection is active - // (3) Timers are not disabled for the source - // (4) The current active timer for the connection points to a later deadline - if (newDeadline == Long.MAX_VALUE || - flags and ConnState != ConnActive || - flags and ConnDisableTimers != 0 || - (timer != Long.MAX_VALUE && newDeadline >= timer) - ) { - // Ignore any deadline scheduled at the maximum value - // This indicates that the source does not want to register a timer - return - } - - // Construct a timer for the new deadline and add it to the global queue of timers - _timer = newDeadline - timerQueue.add(this, newDeadline) - - // Slow-path: a timer already exists for this connection, so add it to the queue of pending timers - if (timer != Long.MAX_VALUE) { - pendingTimers.addFirst(timer) - } - } - - /** - * This method is invoked when the system converges into a steady state. - */ - fun onConverge(now: Long) { - try { - val flags = _flags - - // The connection is converging now, so unset the convergence pending flag - _flags = flags and ConnConvergePending.inv() - - // Call the source converge callback if it has enabled convergence - if (flags and ConnConvergeSource != 0) { - source.onConverge(this, now) - } - - // Call the consumer callback if it has enabled convergence - if (flags and ConnConvergeConsumer != 0) { - logic.onConverge(this, now) - } - } catch (cause: Throwable) { - // Invoke the finish callbacks - doFailSource(now, cause) - - // Mark the connection as closed - _flags = (_flags and ConnState.inv()) or ConnClosed - _demand = 0.0 - _deadline = Long.MAX_VALUE - } - } - - override fun toString(): String = "FlowConsumerContextImpl[capacity=$capacity,rate=$_rate]" - - /** - * Stop the [FlowSource]. - */ - private fun doStopSource(now: Long) { - try { - source.onStop(this, now) - doFinishConsumer(now, null) - } catch (cause: Throwable) { - doFinishConsumer(now, cause) - } - } - - /** - * Fail the [FlowSource]. - */ - private fun doFailSource(now: Long, cause: Throwable) { - try { - source.onStop(this, now) - } catch (e: Throwable) { - e.addSuppressed(cause) - doFinishConsumer(now, e) - } - } - - /** - * Finish the consumer. - */ - private fun doFinishConsumer(now: Long, cause: Throwable?) { - try { - logic.onFinish(this, now, cause) - } catch (e: Throwable) { - e.addSuppressed(cause) - logger.error(e) { "Uncaught exception" } - } - } - - /** - * Schedule an immediate update for this connection. - */ - private fun scheduleImmediate(now: Long, flags: Int) { - // In case an immediate update is already scheduled, no need to do anything - if (flags and ConnUpdatePending != 0) { - _flags = flags - return - } - - // Mark the connection that there is an update pending - _flags = flags or ConnUpdatePending - - engine.scheduleImmediate(now, this) - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt deleted file mode 100644 index 403a9aec..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.internal - -import java.util.ArrayDeque - -/** - * A specialized [ArrayDeque] that tracks the [FlowConsumerContextImpl] instances that have updated in an interpreter - * cycle. - * - * By using a specialized class, we reduce the overhead caused by type-erasure. - */ -internal class FlowDeque(initialCapacity: Int = 256) { - /** - * The array of elements in the queue. - */ - private var _elements: Array<FlowConsumerContextImpl?> = arrayOfNulls(initialCapacity) - private var _head = 0 - private var _tail = 0 - - /** - * Determine whether this queue is not empty. - */ - fun isNotEmpty(): Boolean { - return _head != _tail - } - - /** - * Add the specified [ctx] to the queue. - */ - fun add(ctx: FlowConsumerContextImpl) { - val es = _elements - var tail = _tail - - es[tail] = ctx - - tail = inc(tail, es.size) - _tail = tail - - if (_head == tail) { - doubleCapacity() - } - } - - /** - * Remove a [FlowConsumerContextImpl] from the queue or `null` if the queue is empty. - */ - fun poll(): FlowConsumerContextImpl? { - val es = _elements - val head = _head - val ctx = es[head] - - if (ctx != null) { - es[head] = null - _head = inc(head, es.size) - } - - return ctx - } - - /** - * Clear the queue. - */ - fun clear() { - _elements.fill(null) - _head = 0 - _tail = 0 - } - - private fun inc(i: Int, modulus: Int): Int { - var x = i - if (++x >= modulus) { - x = 0 - } - return x - } - - /** - * Doubles the capacity of this deque - */ - private fun doubleCapacity() { - assert(_head == _tail) - val p = _head - val n = _elements.size - val r = n - p // number of elements to the right of p - - val newCapacity = n shl 1 - check(newCapacity >= 0) { "Sorry, deque too big" } - - val a = arrayOfNulls<FlowConsumerContextImpl>(newCapacity) - - _elements.copyInto(a, 0, p, n) - _elements.copyInto(a, r, 0, p) - - _elements = a - _head = 0 - _tail = n - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt deleted file mode 100644 index 6fd1ef31..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.internal - -import kotlinx.coroutines.Delay -import kotlinx.coroutines.DisposableHandle -import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.Runnable -import org.opendc.simulator.flow.FlowConsumerContext -import org.opendc.simulator.flow.FlowConsumerLogic -import org.opendc.simulator.flow.FlowEngine -import org.opendc.simulator.flow.FlowSource -import java.time.Clock -import java.util.ArrayDeque -import kotlin.coroutines.ContinuationInterceptor -import kotlin.coroutines.CoroutineContext - -/** - * Internal implementation of the [FlowEngine] interface. - * - * @param context The coroutine context to use. - * @param clock The virtual simulation clock. - */ -internal class FlowEngineImpl(private val context: CoroutineContext, clock: Clock) : FlowEngine, Runnable { - /** - * The [Delay] instance that provides scheduled execution of [Runnable]s. - */ - @OptIn(InternalCoroutinesApi::class) - private val delay = requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" } - - /** - * The queue of connection updates that are scheduled for immediate execution. - */ - private val queue = FlowDeque() - - /** - * A priority queue containing the connection updates to be scheduled in the future. - */ - private val futureQueue = FlowTimerQueue() - - /** - * The stack of engine invocations to occur in the future. - */ - private val futureInvocations = ArrayDeque<Invocation>() - - /** - * The systems that have been visited during the engine cycle. - */ - private val visited = FlowDeque() - - /** - * The index in the batch stack. - */ - private var batchIndex = 0 - - /** - * The virtual [Clock] of this engine. - */ - override val clock: Clock - get() = _clock - private val _clock: Clock = clock - - /** - * Update the specified [ctx] synchronously. - */ - fun scheduleSync(now: Long, ctx: FlowConsumerContextImpl) { - ctx.doUpdate(now, visited, futureQueue, isImmediate = true) - - // In-case the engine is already running in the call-stack, return immediately. The changes will be picked - // up by the active engine. - if (batchIndex > 0) { - return - } - - doRunEngine(now) - } - - /** - * Enqueue the specified [ctx] to be updated immediately during the active engine cycle. - * - * This method should be used when the state of a flow context is invalidated/interrupted and needs to be - * re-computed. In case no engine is currently active, the engine will be started. - */ - fun scheduleImmediate(now: Long, ctx: FlowConsumerContextImpl) { - queue.add(ctx) - - // In-case the engine is already running in the call-stack, return immediately. The changes will be picked - // up by the active engine. - if (batchIndex > 0) { - return - } - - doRunEngine(now) - } - - override fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext = FlowConsumerContextImpl(this, consumer, provider) - - override fun pushBatch() { - batchIndex++ - } - - override fun popBatch() { - try { - // Flush the work if the engine is not already running - if (batchIndex == 1 && queue.isNotEmpty()) { - doRunEngine(_clock.millis()) - } - } finally { - batchIndex-- - } - } - - /* Runnable */ - override fun run() { - val invocation = futureInvocations.poll() // Clear invocation from future invocation queue - doRunEngine(invocation.timestamp) - } - - /** - * Run all the enqueued actions for the specified [timestamp][now]. - */ - private fun doRunEngine(now: Long) { - val queue = queue - val futureQueue = futureQueue - val futureInvocations = futureInvocations - val visited = visited - - try { - // Increment batch index so synchronous calls will not launch concurrent engine invocations - batchIndex++ - - // Execute all scheduled updates at current timestamp - while (true) { - val ctx = futureQueue.poll(now) ?: break - ctx.doUpdate(now, visited, futureQueue, isImmediate = false) - } - - // Repeat execution of all immediate updates until the system has converged to a steady-state - // We have to take into account that the onConverge callback can also trigger new actions. - do { - // Execute all immediate updates - while (true) { - val ctx = queue.poll() ?: break - ctx.doUpdate(now, visited, futureQueue, isImmediate = true) - } - - while (true) { - val ctx = visited.poll() ?: break - ctx.onConverge(now) - } - } while (queue.isNotEmpty()) - } finally { - // Decrement batch index to indicate no engine is active at the moment - batchIndex-- - } - - // Schedule an engine invocation for the next update to occur. - val headDeadline = futureQueue.peekDeadline() - if (headDeadline != Long.MAX_VALUE) { - trySchedule(now, futureInvocations, headDeadline) - } - } - - /** - * Try to schedule an engine invocation at the specified [target]. - * - * @param now The current virtual timestamp. - * @param target The virtual timestamp at which the engine invocation should happen. - * @param scheduled The queue of scheduled invocations. - */ - private fun trySchedule(now: Long, scheduled: ArrayDeque<Invocation>, target: Long) { - val head = scheduled.peek() - - // Only schedule a new scheduler invocation in case the target is earlier than all other pending - // scheduler invocations - if (head == null || target < head.timestamp) { - @OptIn(InternalCoroutinesApi::class) - val handle = delay.invokeOnTimeout(target - now, this, context) - scheduled.addFirst(Invocation(target, handle)) - } - } - - /** - * A future engine invocation. - * - * This class is used to keep track of the future engine invocations created using the [Delay] instance. In case - * the invocation is not needed anymore, it can be cancelled via [cancel]. - */ - private class Invocation( - @JvmField val timestamp: Long, - @JvmField val handle: DisposableHandle - ) { - /** - * Cancel the engine invocation. - */ - fun cancel() = handle.dispose() - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt deleted file mode 100644 index 47061a91..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.internal - -/** - * Specialized priority queue for flow timers. - * - * By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation - * being generic. - */ -internal class FlowTimerQueue(initialCapacity: Int = 256) { - /** - * The binary heap of deadlines. - */ - private var _deadlines = LongArray(initialCapacity) { Long.MIN_VALUE } - - /** - * The binary heap of [FlowConsumerContextImpl]s. - */ - private var _pending = arrayOfNulls<FlowConsumerContextImpl>(initialCapacity) - - /** - * The number of elements in the priority queue. - */ - private var size = 0 - - /** - * Register a timer for [ctx] with [deadline]. - */ - fun add(ctx: FlowConsumerContextImpl, deadline: Long) { - val i = size - var deadlines = _deadlines - if (i >= deadlines.size) { - grow() - // Re-fetch the resized array - deadlines = _deadlines - } - - siftUp(deadlines, _pending, i, ctx, deadline) - - size = i + 1 - } - - /** - * Update all pending [FlowConsumerContextImpl]s at the timestamp [now]. - */ - fun poll(now: Long): FlowConsumerContextImpl? { - if (size == 0) { - return null - } - - val deadlines = _deadlines - val deadline = deadlines[0] - - if (now < deadline) { - return null - } - - val pending = _pending - val res = pending[0] - val s = --size - - val nextDeadline = deadlines[s] - val next = pending[s]!! - - // Clear the last element of the queue - pending[s] = null - deadlines[s] = Long.MIN_VALUE - - if (s != 0) { - siftDown(deadlines, pending, next, nextDeadline) - } - - return res - } - - /** - * Find the earliest deadline in the queue. - */ - fun peekDeadline(): Long { - return if (size == 0) Long.MAX_VALUE else _deadlines[0] - } - - /** - * Increases the capacity of the array. - */ - private fun grow() { - val oldCapacity = _deadlines.size - // Double size if small; else grow by 50% - val newCapacity = oldCapacity + if (oldCapacity < 64) oldCapacity + 2 else oldCapacity shr 1 - - _deadlines = _deadlines.copyOf(newCapacity) - _pending = _pending.copyOf(newCapacity) - } - - /** - * Insert item [ctx] at position [pos], maintaining heap invariant by promoting [ctx] up the tree until it is - * greater than or equal to its parent, or is the root. - * - * @param deadlines The heap of deadlines. - * @param pending The heap of contexts. - * @param pos The position to fill. - * @param ctx The [FlowConsumerContextImpl] to insert. - * @param deadline The deadline of the context. - */ - private fun siftUp( - deadlines: LongArray, - pending: Array<FlowConsumerContextImpl?>, - pos: Int, - ctx: FlowConsumerContextImpl, - deadline: Long - ) { - var k = pos - - while (k > 0) { - val parent = (k - 1) ushr 1 - val parentDeadline = deadlines[parent] - - if (deadline >= parentDeadline) { - break - } - - deadlines[k] = parentDeadline - pending[k] = pending[parent] - - k = parent - } - - deadlines[k] = deadline - pending[k] = ctx - } - - /** - * Inserts [ctx] at the top, maintaining heap invariant by demoting [ctx] down the tree repeatedly until it - * is less than or equal to its children or is a leaf. - * - * @param deadlines The heap of deadlines. - * @param pending The heap of contexts. - * @param ctx The [FlowConsumerContextImpl] to insert. - * @param deadline The deadline of the context. - */ - private fun siftDown( - deadlines: LongArray, - pending: Array<FlowConsumerContextImpl?>, - ctx: FlowConsumerContextImpl, - deadline: Long - ) { - var k = 0 - val size = size - val half = size ushr 1 - - while (k < half) { - var child = (k shl 1) + 1 - - var childDeadline = deadlines[child] - val right = child + 1 - - if (right < size) { - val rightDeadline = deadlines[right] - - if (childDeadline > rightDeadline) { - child = right - childDeadline = rightDeadline - } - } - - if (deadline <= childDeadline) { - break - } - - deadlines[k] = childDeadline - pending[k] = pending[child] - - k = child - } - - deadlines[k] = deadline - pending[k] = ctx - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt deleted file mode 100644 index 8752c559..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.mux - -import org.opendc.simulator.flow.FlowConsumer -import org.opendc.simulator.flow.FlowCounters -import org.opendc.simulator.flow.FlowSource - -/** - * A [FlowMultiplexer] enables multiplexing multiple [FlowSource]s over possibly multiple [FlowConsumer]s. - */ -public interface FlowMultiplexer { - /** - * The maximum number of inputs supported by the multiplexer. - */ - public val maxInputs: Int - - /** - * The maximum number of outputs supported by the multiplexer. - */ - public val maxOutputs: Int - - /** - * The inputs of the multiplexer that can be used to consume sources. - */ - public val inputs: Set<FlowConsumer> - - /** - * The outputs of the multiplexer over which the flows will be distributed. - */ - public val outputs: Set<FlowSource> - - /** - * The actual processing rate of the multiplexer. - */ - public val rate: Double - - /** - * The demanded processing rate of the input. - */ - public val demand: Double - - /** - * The capacity of the outputs. - */ - public val capacity: Double - - /** - * The flow counters to track the flow metrics of all multiplexer inputs. - */ - public val counters: FlowCounters - - /** - * Create a new input on this multiplexer with a coupled capacity. - */ - public fun newInput(): FlowConsumer - - /** - * Create a new input on this multiplexer with the specified [capacity]. - * - * @param capacity The capacity of the input. - */ - public fun newInput(capacity: Double): FlowConsumer - - /** - * Remove [input] from this multiplexer. - */ - public fun removeInput(input: FlowConsumer) - - /** - * Create a new output on this multiplexer. - */ - public fun newOutput(): FlowSource - - /** - * Remove [output] from this multiplexer. - */ - public fun removeOutput(output: FlowSource) - - /** - * Clear all inputs and outputs from the multiplexer. - */ - public fun clear() - - /** - * Clear the inputs of the multiplexer. - */ - public fun clearInputs() - - /** - * Clear the outputs of the multiplexer. - */ - public fun clearOutputs() - - /** - * Flush the counters of the multiplexer. - */ - public fun flushCounters() - - /** - * Flush the counters of the specified [input]. - */ - public fun flushCounters(input: FlowConsumer) -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt deleted file mode 100644 index a863e3ad..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.mux - -import org.opendc.simulator.flow.FlowConvergenceListener -import org.opendc.simulator.flow.FlowEngine - -/** - * Factory interface for a [FlowMultiplexer] implementation. - */ -public fun interface FlowMultiplexerFactory { - /** - * Construct a new [FlowMultiplexer] using the specified [engine] and [listener]. - */ - public fun newMultiplexer(engine: FlowEngine, listener: FlowConvergenceListener?): FlowMultiplexer - - public companion object { - /** - * A [FlowMultiplexerFactory] constructing a [MaxMinFlowMultiplexer]. - */ - private val MAX_MIN_FACTORY = FlowMultiplexerFactory { engine, listener -> MaxMinFlowMultiplexer(engine, listener) } - - /** - * A [FlowMultiplexerFactory] constructing a [ForwardingFlowMultiplexer]. - */ - private val FORWARDING_FACTORY = FlowMultiplexerFactory { engine, listener -> ForwardingFlowMultiplexer(engine, listener) } - - /** - * Return a [FlowMultiplexerFactory] that returns [MaxMinFlowMultiplexer] instances. - */ - @JvmStatic - public fun maxMinMultiplexer(): FlowMultiplexerFactory = MAX_MIN_FACTORY - - /** - * Return a [ForwardingFlowMultiplexer] that returns [ForwardingFlowMultiplexer] instances. - */ - @JvmStatic - public fun forwardingMultiplexer(): FlowMultiplexerFactory = FORWARDING_FACTORY - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt deleted file mode 100644 index 53f94a94..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.mux - -import org.opendc.simulator.flow.FlowConnection -import org.opendc.simulator.flow.FlowConsumer -import org.opendc.simulator.flow.FlowConvergenceListener -import org.opendc.simulator.flow.FlowCounters -import org.opendc.simulator.flow.FlowEngine -import org.opendc.simulator.flow.FlowForwarder -import org.opendc.simulator.flow.FlowSource -import java.util.ArrayDeque - -/** - * A [FlowMultiplexer] implementation that allocates inputs to the outputs of the multiplexer exclusively. This means - * that a single input is directly connected to an output and that the multiplexer can only support as many - * inputs as outputs. - * - * @param engine The [FlowEngine] driving the simulation. - * @param listener The convergence listener of the multiplexer. - */ -public class ForwardingFlowMultiplexer( - private val engine: FlowEngine, - private val listener: FlowConvergenceListener? = null -) : FlowMultiplexer, FlowConvergenceListener { - - override val maxInputs: Int - get() = _outputs.size - - override val maxOutputs: Int = Int.MAX_VALUE - - override val inputs: Set<FlowConsumer> - get() = _inputs - private val _inputs = mutableSetOf<Input>() - - override val outputs: Set<FlowSource> - get() = _outputs - private val _outputs = mutableSetOf<Output>() - private val _availableOutputs = ArrayDeque<Output>() - - override val counters: FlowCounters = object : FlowCounters { - override val demand: Double - get() = _outputs.sumOf { it.forwarder.counters.demand } - override val actual: Double - get() = _outputs.sumOf { it.forwarder.counters.actual } - override val remaining: Double - get() = _outputs.sumOf { it.forwarder.counters.remaining } - - override fun reset() { - for (output in _outputs) { - output.forwarder.counters.reset() - } - } - - override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining]" - } - - override val rate: Double - get() = _outputs.sumOf { it.forwarder.rate } - - override val demand: Double - get() = _outputs.sumOf { it.forwarder.demand } - - override val capacity: Double - get() = _outputs.sumOf { it.forwarder.capacity } - - override fun newInput(): FlowConsumer { - val output = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" } - val input = Input(output) - _inputs += input - return input - } - - override fun newInput(capacity: Double): FlowConsumer = newInput() - - override fun removeInput(input: FlowConsumer) { - if (!_inputs.remove(input)) { - return - } - - val output = (input as Input).output - output.forwarder.cancel() - _availableOutputs += output - } - - override fun newOutput(): FlowSource { - val forwarder = FlowForwarder(engine, this) - val output = Output(forwarder) - - _outputs += output - return output - } - - override fun removeOutput(output: FlowSource) { - if (!_outputs.remove(output)) { - return - } - - val forwarder = (output as Output).forwarder - forwarder.close() - } - - override fun clearInputs() { - for (input in _inputs) { - val output = input.output - output.forwarder.cancel() - _availableOutputs += output - } - - _inputs.clear() - } - - override fun clearOutputs() { - for (output in _outputs) { - output.forwarder.cancel() - } - _outputs.clear() - _availableOutputs.clear() - } - - override fun clear() { - clearOutputs() - clearInputs() - } - - override fun flushCounters() {} - - override fun flushCounters(input: FlowConsumer) {} - - override fun onConverge(now: Long) { - listener?.onConverge(now) - } - - /** - * An input on the multiplexer. - */ - private inner class Input(@JvmField val output: Output) : FlowConsumer by output.forwarder { - override fun toString(): String = "ForwardingFlowMultiplexer.Input" - } - - /** - * An output on the multiplexer. - */ - private inner class Output(@JvmField val forwarder: FlowForwarder) : FlowSource by forwarder { - override fun onStart(conn: FlowConnection, now: Long) { - _availableOutputs += this - forwarder.onStart(conn, now) - } - - override fun onStop(conn: FlowConnection, now: Long) { - forwarder.cancel() - forwarder.onStop(conn, now) - } - - override fun toString(): String = "ForwardingFlowMultiplexer.Output" - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt deleted file mode 100644 index d9c6f893..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ /dev/null @@ -1,811 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.mux - -import org.opendc.simulator.flow.FlowConnection -import org.opendc.simulator.flow.FlowConsumer -import org.opendc.simulator.flow.FlowConsumerContext -import org.opendc.simulator.flow.FlowConsumerLogic -import org.opendc.simulator.flow.FlowConvergenceListener -import org.opendc.simulator.flow.FlowCounters -import org.opendc.simulator.flow.FlowEngine -import org.opendc.simulator.flow.FlowSource -import org.opendc.simulator.flow.internal.D_MS_TO_S -import org.opendc.simulator.flow.internal.MutableFlowCounters -import kotlin.math.min - -/** - * A [FlowMultiplexer] implementation that multiplexes flows over the available outputs using max-min fair sharing. - * - * @param engine The [FlowEngine] to drive the flow simulation. - * @param parent The parent flow system of the multiplexer. - */ -public class MaxMinFlowMultiplexer( - private val engine: FlowEngine, - parent: FlowConvergenceListener? = null -) : FlowMultiplexer { - - override val maxInputs: Int = Int.MAX_VALUE - - override val maxOutputs: Int = Int.MAX_VALUE - - /** - * The inputs of the multiplexer. - */ - override val inputs: Set<FlowConsumer> - get() = _inputs - private val _inputs = mutableSetOf<Input>() - - /** - * The outputs of the multiplexer. - */ - override val outputs: Set<FlowSource> - get() = _outputs - private val _outputs = mutableSetOf<Output>() - - /** - * The flow counters of this multiplexer. - */ - public override val counters: FlowCounters - get() = scheduler.counters - - /** - * The actual processing rate of the multiplexer. - */ - public override val rate: Double - get() = scheduler.rate - - /** - * The demanded processing rate of the input. - */ - public override val demand: Double - get() = scheduler.demand - - /** - * The capacity of the outputs. - */ - public override val capacity: Double - get() = scheduler.capacity - - /** - * The [Scheduler] instance of this multiplexer. - */ - private val scheduler = Scheduler(engine, parent) - - override fun newInput(): FlowConsumer { - return newInput(isCoupled = true, Double.POSITIVE_INFINITY) - } - - override fun newInput(capacity: Double): FlowConsumer { - return newInput(isCoupled = false, capacity) - } - - private fun newInput(isCoupled: Boolean, initialCapacity: Double): FlowConsumer { - val provider = Input(engine, scheduler, isCoupled, initialCapacity) - _inputs.add(provider) - return provider - } - - override fun removeInput(input: FlowConsumer) { - if (!_inputs.remove(input)) { - return - } - // This cast should always succeed since only `Input` instances should be added to `_inputs` - (input as Input).close() - } - - override fun newOutput(): FlowSource { - val output = Output(scheduler) - _outputs.add(output) - return output - } - - override fun removeOutput(output: FlowSource) { - if (!_outputs.remove(output)) { - return - } - - // This cast should always succeed since only `Output` instances should be added to `_outputs` - (output as Output).cancel() - } - - override fun clearInputs() { - for (input in _inputs) { - input.cancel() - } - _inputs.clear() - } - - override fun clearOutputs() { - for (output in _outputs) { - output.cancel() - } - _outputs.clear() - } - - override fun clear() { - clearOutputs() - clearInputs() - } - - override fun flushCounters() { - scheduler.updateCounters(engine.clock.millis()) - } - - override fun flushCounters(input: FlowConsumer) { - (input as Input).doUpdateCounters(engine.clock.millis()) - } - - /** - * Helper class containing the scheduler state. - */ - private class Scheduler(engine: FlowEngine, private val parent: FlowConvergenceListener?) { - /** - * The flow counters of this scheduler. - */ - @JvmField val counters = MutableFlowCounters() - - /** - * The flow rate of the multiplexer. - */ - @JvmField var rate = 0.0 - - /** - * The demand for the multiplexer. - */ - @JvmField var demand = 0.0 - - /** - * The capacity of the multiplexer. - */ - @JvmField var capacity = 0.0 - - /** - * An [Output] that is used to activate the scheduler. - */ - @JvmField var activationOutput: Output? = null - - /** - * The active inputs registered with the scheduler. - */ - private val _activeInputs = mutableListOf<Input>() - - /** - * An array containing the active inputs, which is used to reduce the overhead of an [ArrayList]. - */ - private var _inputArray = emptyArray<Input>() - - /** - * The active outputs registered with the scheduler. - */ - private val _activeOutputs = mutableListOf<Output>() - - /** - * Flag to indicate that the scheduler is active. - */ - private var _schedulerActive = false - - /** - * The last convergence timestamp and the input. - */ - private var _lastConverge: Long = Long.MIN_VALUE - private var _lastConvergeInput: Input? = null - - /** - * The simulation clock. - */ - private val _clock = engine.clock - - /** - * Register the specified [input] to this scheduler. - */ - fun registerInput(input: Input) { - _activeInputs.add(input) - _inputArray = _activeInputs.toTypedArray() - - val hasActivationOutput = activationOutput != null - - // Disable timers and convergence of the source if one of the output manages it - input.shouldConsumerConverge = !hasActivationOutput - input.enableTimers = !hasActivationOutput - - if (input.isCoupled) { - input.capacity = capacity - } - - trigger(_clock.millis()) - } - - /** - * De-register the specified [input] from this scheduler. - */ - fun deregisterInput(input: Input, now: Long) { - // Assign a new input responsible for handling the convergence events - if (_lastConvergeInput == input) { - _lastConvergeInput = null - } - - _activeInputs.remove(input) - - // Re-run scheduler to distribute new load - trigger(now) - } - - /** - * This method is invoked when one of the inputs converges. - */ - fun convergeInput(input: Input, now: Long) { - val lastConverge = _lastConverge - val lastConvergeInput = _lastConvergeInput - val parent = parent - - if (parent != null && (now > lastConverge || lastConvergeInput == null || lastConvergeInput == input)) { - _lastConverge = now - _lastConvergeInput = input - - parent.onConverge(now) - } - } - - /** - * Register the specified [output] to this scheduler. - */ - fun registerOutput(output: Output) { - _activeOutputs.add(output) - - updateCapacity() - updateActivationOutput() - } - - /** - * De-register the specified [output] from this scheduler. - */ - fun deregisterOutput(output: Output, now: Long) { - _activeOutputs.remove(output) - updateCapacity() - - trigger(now) - } - - /** - * This method is invoked when one of the outputs converges. - */ - fun convergeOutput(output: Output, now: Long) { - val parent = parent - - if (parent != null) { - _lastConverge = now - parent.onConverge(now) - } - - if (!output.isActive) { - output.isActivationOutput = false - updateActivationOutput() - } - } - - /** - * Trigger the scheduler of the multiplexer. - * - * @param now The current virtual timestamp of the simulation. - */ - fun trigger(now: Long) { - if (_schedulerActive) { - // No need to trigger the scheduler in case it is already active - return - } - - val activationOutput = activationOutput - - // We can run the scheduler in two ways: - // (1) We can pull one of the multiplexer's outputs. This allows us to cascade multiple pushes by the input - // into a single scheduling cycle, but is slower in case of a few changes at the same timestamp. - // (2) We run the scheduler directly from this method call. This is the fastest approach when there are only - // a few inputs and little changes at the same timestamp. - // We always pick for option (1) unless there are no outputs available. - if (activationOutput != null) { - activationOutput.pull(now) - return - } else { - runScheduler(now) - } - } - - /** - * Synchronously run the scheduler of the multiplexer. - */ - fun runScheduler(now: Long): Long { - return try { - _schedulerActive = true - doRunScheduler(now) - } finally { - _schedulerActive = false - } - } - - /** - * Recompute the capacity of the multiplexer. - */ - fun updateCapacity() { - val newCapacity = _activeOutputs.sumOf(Output::capacity) - - // No-op if the capacity is unchanged - if (capacity == newCapacity) { - return - } - - capacity = newCapacity - - for (input in _activeInputs) { - if (input.isCoupled) { - input.capacity = newCapacity - } - } - - // Sort outputs by their capacity - _activeOutputs.sort() - } - - /** - * Updates the output that is used for scheduler activation. - */ - private fun updateActivationOutput() { - val output = _activeOutputs.firstOrNull() - activationOutput = output - - if (output != null) { - output.isActivationOutput = true - } - - val hasActivationOutput = output != null - - for (input in _activeInputs) { - input.shouldConsumerConverge = !hasActivationOutput - input.enableTimers = !hasActivationOutput - } - } - - /** - * Schedule the inputs over the outputs. - * - * @return The deadline after which a new scheduling cycle should start. - */ - private fun doRunScheduler(now: Long): Long { - val activeInputs = _activeInputs - val activeOutputs = _activeOutputs - var inputArray = _inputArray - var inputSize = _inputArray.size - - // Update the counters of the scheduler - updateCounters(now) - - // If there is no work yet, mark the inputs as idle. - if (inputSize == 0) { - demand = 0.0 - rate = 0.0 - return Long.MAX_VALUE - } - - val capacity = capacity - var availableCapacity = capacity - var deadline = Long.MAX_VALUE - var demand = 0.0 - var shouldRebuild = false - - // Pull in the work of the inputs - for (i in 0 until inputSize) { - val input = inputArray[i] - - input.pullSync(now) - - // Remove inputs that have finished - if (!input.isActive) { - input.actualRate = 0.0 - shouldRebuild = true - } else { - demand += input.limit - deadline = min(deadline, input.deadline) - } - } - - // Slow-path: Rebuild the input array based on the (apparently) updated `activeInputs` - if (shouldRebuild) { - inputArray = activeInputs.toTypedArray() - inputSize = inputArray.size - _inputArray = inputArray - } - - val rate = if (demand > capacity) { - // If the demand is higher than the capacity, we need use max-min fair sharing to distribute the - // constrained capacity across the inputs. - - // Sort in-place the inputs based on their pushed flow. - // Profiling shows that it is faster than maintaining some kind of sorted set. - inputArray.sort() - - // Divide the available output capacity fairly over the inputs using max-min fair sharing - for (i in 0 until inputSize) { - val input = inputArray[i] - val availableShare = availableCapacity / (inputSize - i) - val grantedRate = min(input.allowedRate, availableShare) - - availableCapacity -= grantedRate - input.actualRate = grantedRate - } - - capacity - availableCapacity - } else { - demand - } - - this.demand = demand - if (this.rate != rate) { - // Only update the outputs if the output rate has changed - this.rate = rate - - // Divide the requests over the available capacity of the input resources fairly - for (i in activeOutputs.indices) { - val output = activeOutputs[i] - val inputCapacity = output.capacity - val fraction = inputCapacity / capacity - val grantedSpeed = rate * fraction - - output.push(grantedSpeed) - } - } - - return deadline - } - - /** - * The previous capacity of the multiplexer. - */ - private var _previousCapacity = 0.0 - private var _previousUpdate = Long.MIN_VALUE - - /** - * Update the counters of the scheduler. - */ - fun updateCounters(now: Long) { - val previousCapacity = _previousCapacity - _previousCapacity = capacity - - val previousUpdate = _previousUpdate - _previousUpdate = now - - val delta = now - previousUpdate - if (delta <= 0) { - return - } - - val deltaS = delta * D_MS_TO_S - val demand = demand - val rate = rate - - counters.increment( - demand = demand * deltaS, - actual = rate * deltaS, - remaining = (previousCapacity - rate) * deltaS - ) - } - } - - /** - * An internal [FlowConsumer] implementation for multiplexer inputs. - */ - private class Input( - private val engine: FlowEngine, - private val scheduler: Scheduler, - @JvmField val isCoupled: Boolean, - initialCapacity: Double - ) : FlowConsumer, FlowConsumerLogic, Comparable<Input> { - /** - * A flag to indicate that the consumer is active. - */ - override val isActive: Boolean - get() = _ctx != null - - /** - * The demand of the consumer. - */ - override val demand: Double - get() = limit - - /** - * The processing rate of the consumer. - */ - override val rate: Double - get() = actualRate - - /** - * The capacity of the input. - */ - override var capacity: Double - get() = _capacity - set(value) { - allowedRate = min(limit, value) - _capacity = value - _ctx?.capacity = value - } - private var _capacity = initialCapacity - - /** - * The flow counters to track the flow metrics of the consumer. - */ - override val counters: FlowCounters - get() = _counters - private val _counters = MutableFlowCounters() - - /** - * A flag to enable timers for the input. - */ - var enableTimers: Boolean = true - set(value) { - field = value - _ctx?.enableTimers = value - } - - /** - * A flag to control whether the input should converge. - */ - var shouldConsumerConverge: Boolean = true - set(value) { - field = value - _ctx?.shouldConsumerConverge = value - } - - /** - * The requested limit. - */ - @JvmField var limit: Double = 0.0 - - /** - * The actual processing speed. - */ - @JvmField var actualRate: Double = 0.0 - - /** - * The processing rate that is allowed by the model constraints. - */ - @JvmField var allowedRate: Double = 0.0 - - /** - * The deadline of the input. - */ - val deadline: Long - get() = _ctx?.deadline ?: Long.MAX_VALUE - - /** - * The [FlowConsumerContext] that is currently running. - */ - private var _ctx: FlowConsumerContext? = null - - /** - * A flag to indicate that the input is closed. - */ - private var _isClosed: Boolean = false - - /** - * Close the input. - * - * This method is invoked when the user removes an input from the switch. - */ - fun close() { - _isClosed = true - cancel() - } - - /** - * Pull the source if necessary. - */ - fun pullSync(now: Long) { - _ctx?.pullSync(now) - } - - /* FlowConsumer */ - override fun startConsumer(source: FlowSource) { - check(!_isClosed) { "Cannot re-use closed input" } - check(_ctx == null) { "Consumer is in invalid state" } - - val ctx = engine.newContext(source, this) - _ctx = ctx - - ctx.capacity = capacity - scheduler.registerInput(this) - - ctx.start() - } - - override fun pull() { - _ctx?.pull() - } - - override fun cancel() { - _ctx?.close() - } - - /* FlowConsumerLogic */ - override fun onPush( - ctx: FlowConsumerContext, - now: Long, - rate: Double - ) { - doUpdateCounters(now) - - val allowed = min(rate, capacity) - limit = rate - actualRate = allowed - allowedRate = allowed - - scheduler.trigger(now) - } - - override fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) { - doUpdateCounters(now) - - limit = 0.0 - actualRate = 0.0 - allowedRate = 0.0 - - scheduler.deregisterInput(this, now) - - _ctx = null - } - - override fun onConverge(ctx: FlowConsumerContext, now: Long) { - scheduler.convergeInput(this, now) - } - - /* Comparable */ - override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate) - - /** - * The timestamp that the counters where last updated. - */ - private var _lastUpdate = Long.MIN_VALUE - - /** - * Helper method to update the flow counters of the multiplexer. - */ - fun doUpdateCounters(now: Long) { - val lastUpdate = _lastUpdate - _lastUpdate = now - - val delta = (now - lastUpdate).coerceAtLeast(0) - if (delta <= 0L) { - return - } - - val actualRate = actualRate - - val deltaS = delta * D_MS_TO_S - val demand = limit * deltaS - val actual = actualRate * deltaS - val remaining = (_capacity - actualRate) * deltaS - - _counters.increment(demand, actual, remaining) - scheduler.counters.increment(0.0, 0.0, 0.0) - } - } - - /** - * An internal [FlowSource] implementation for multiplexer outputs. - */ - private class Output(private val scheduler: Scheduler) : FlowSource, Comparable<Output> { - /** - * The active [FlowConnection] of this source. - */ - private var _conn: FlowConnection? = null - - /** - * The capacity of this output. - */ - @JvmField var capacity: Double = 0.0 - - /** - * A flag to indicate that this output is the activation output. - */ - var isActivationOutput: Boolean - get() = _isActivationOutput - set(value) { - _isActivationOutput = value - _conn?.shouldSourceConverge = value - } - private var _isActivationOutput: Boolean = false - - /** - * A flag to indicate that the output is active. - */ - @JvmField var isActive = false - - /** - * Push the specified rate to the consumer. - */ - fun push(rate: Double) { - _conn?.push(rate) - } - - /** - * Cancel this output. - */ - fun cancel() { - _conn?.close() - } - - /** - * Pull this output. - */ - fun pull(now: Long) { - _conn?.pull(now) - } - - override fun onStart(conn: FlowConnection, now: Long) { - assert(_conn == null) { "Source running concurrently" } - _conn = conn - capacity = conn.capacity - isActive = true - - scheduler.registerOutput(this) - } - - override fun onStop(conn: FlowConnection, now: Long) { - _conn = null - capacity = 0.0 - isActive = false - - scheduler.deregisterOutput(this, now) - } - - override fun onPull(conn: FlowConnection, now: Long): Long { - val capacity = capacity - if (capacity != conn.capacity) { - this.capacity = capacity - scheduler.updateCapacity() - } - - return if (_isActivationOutput) { - // If this output is the activation output, synchronously run the scheduler and return the new deadline - val deadline = scheduler.runScheduler(now) - if (deadline == Long.MAX_VALUE) { - deadline - } else { - deadline - now - } - } else { - // Output is not the activation output, so trigger activation output and do not install timer for this - // output (by returning `Long.MAX_VALUE`) - scheduler.trigger(now) - - Long.MAX_VALUE - } - } - - override fun onConverge(conn: FlowConnection, now: Long) { - if (_isActivationOutput) { - scheduler.convergeOutput(this, now) - } - } - - override fun compareTo(other: Output): Int = capacity.compareTo(other.capacity) - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt deleted file mode 100644 index 6cfcc82c..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.source - -import org.opendc.simulator.flow.FlowConnection -import org.opendc.simulator.flow.FlowSource -import kotlin.math.roundToLong - -/** - * A [FlowSource] that contains a fixed [amount] and is pushed with a given [utilization]. - */ -public class FixedFlowSource(private val amount: Double, private val utilization: Double) : FlowSource { - - init { - require(amount >= 0.0) { "Amount must be positive" } - require(utilization > 0.0) { "Utilization must be positive" } - } - - private var remainingAmount = amount - private var lastPull: Long = 0L - - override fun onStart(conn: FlowConnection, now: Long) { - lastPull = now - } - - override fun onPull(conn: FlowConnection, now: Long): Long { - val lastPull = lastPull - this.lastPull = now - val delta = (now - lastPull).coerceAtLeast(0) - - val consumed = conn.rate * delta / 1000.0 - val limit = conn.capacity * utilization - - remainingAmount -= consumed - - val duration = (remainingAmount / limit * 1000).roundToLong() - - return if (duration > 0) { - conn.push(limit) - duration - } else { - conn.close() - Long.MAX_VALUE - } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt deleted file mode 100644 index 80127fb5..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.source - -import org.opendc.simulator.flow.FlowConnection -import org.opendc.simulator.flow.FlowSource - -/** - * Helper class to expose an observable [rate] field describing the flow rate of the source. - */ -public class FlowSourceRateAdapter( - private val delegate: FlowSource, - private val callback: (Double) -> Unit = {} -) : FlowSource by delegate { - /** - * The resource processing speed at this instant. - */ - public var rate: Double = 0.0 - private set(value) { - if (field != value) { - callback(value) - field = value - } - } - - init { - callback(0.0) - } - - override fun onStart(conn: FlowConnection, now: Long) { - conn.shouldSourceConverge = true - - delegate.onStart(conn, now) - } - - override fun onStop(conn: FlowConnection, now: Long) { - try { - delegate.onStop(conn, now) - } finally { - rate = 0.0 - } - } - - override fun onPull(conn: FlowConnection, now: Long): Long { - return delegate.onPull(conn, now) - } - - override fun onConverge(conn: FlowConnection, now: Long) { - try { - delegate.onConverge(conn, now) - } finally { - rate = conn.rate - } - } - - override fun toString(): String = "FlowSourceRateAdapter[delegate=$delegate]" -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt deleted file mode 100644 index c9a52128..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.source - -import org.opendc.simulator.flow.FlowConnection -import org.opendc.simulator.flow.FlowSource - -/** - * A [FlowSource] that replays a sequence of [Fragment], each indicating the flow rate for some period of time. - */ -public class TraceFlowSource(private val trace: Sequence<Fragment>) : FlowSource { - private var _iterator: Iterator<Fragment>? = null - private var _nextTarget = Long.MIN_VALUE - - override fun onStart(conn: FlowConnection, now: Long) { - check(_iterator == null) { "Source already running" } - _iterator = trace.iterator() - } - - override fun onStop(conn: FlowConnection, now: Long) { - _iterator = null - } - - override fun onPull(conn: FlowConnection, now: Long): Long { - // Check whether the trace fragment was fully consumed, otherwise wait until we have done so - val nextTarget = _nextTarget - if (nextTarget > now) { - return now - nextTarget - } - - val iterator = checkNotNull(_iterator) - return if (iterator.hasNext()) { - val fragment = iterator.next() - _nextTarget = now + fragment.duration - conn.push(fragment.usage) - fragment.duration - } else { - conn.close() - Long.MAX_VALUE - } - } - - /** - * A fragment of the trace. - */ - public data class Fragment(val duration: Long, val usage: Double) -} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt deleted file mode 100644 index f89133dd..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -import io.mockk.spyk -import io.mockk.verify -import net.bytebuddy.matcher.ElementMatchers.any -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.flow.internal.FlowConsumerContextImpl -import org.opendc.simulator.flow.internal.FlowEngineImpl -import org.opendc.simulator.kotlin.runSimulation - -/** - * A test suite for the [FlowConsumerContextImpl] class. - */ -class FlowConsumerContextTest { - @Test - fun testFlushWithoutCommand() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - return if (now == 0L) { - conn.push(1.0) - 1000 - } else { - conn.close() - Long.MAX_VALUE - } - } - } - - val logic = object : FlowConsumerLogic {} - val context = FlowConsumerContextImpl(engine, consumer, logic) - - engine.scheduleSync(engine.clock.millis(), context) - } - - @Test - fun testDoubleStart() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - return if (now == 0L) { - conn.push(0.0) - 1000 - } else { - conn.close() - Long.MAX_VALUE - } - } - } - - val logic = object : FlowConsumerLogic {} - val context = FlowConsumerContextImpl(engine, consumer, logic) - - context.start() - - assertThrows<IllegalStateException> { - context.start() - } - } - - @Test - fun testIdempotentCapacityChange() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val consumer = spyk(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - return if (now == 0L) { - conn.push(1.0) - 1000 - } else { - conn.close() - Long.MAX_VALUE - } - } - }) - - val logic = object : FlowConsumerLogic {} - val context = FlowConsumerContextImpl(engine, consumer, logic) - context.capacity = 4200.0 - context.start() - context.capacity = 4200.0 - - verify(exactly = 1) { consumer.onPull(any(), any()) } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt deleted file mode 100644 index f75e5037..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt +++ /dev/null @@ -1,331 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -import io.mockk.spyk -import io.mockk.verify -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.yield -import net.bytebuddy.matcher.ElementMatchers.any -import org.junit.jupiter.api.Assertions.assertAll -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertFalse -import org.junit.jupiter.api.Assertions.assertTrue -import org.junit.jupiter.api.Disabled -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.flow.internal.FlowEngineImpl -import org.opendc.simulator.flow.source.FixedFlowSource -import org.opendc.simulator.kotlin.runSimulation - -/** - * A test suite for the [FlowForwarder] class. - */ -internal class FlowForwarderTest { - @Test - fun testCancelImmediately() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val source = FlowSink(engine, 2000.0) - - launch { source.consume(forwarder) } - - forwarder.consume(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - conn.close() - return Long.MAX_VALUE - } - }) - - forwarder.close() - source.cancel() - } - - @Test - fun testCancel() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val source = FlowSink(engine, 2000.0) - - launch { source.consume(forwarder) } - - forwarder.consume(object : FlowSource { - var isFirst = true - - override fun onPull(conn: FlowConnection, now: Long): Long { - return if (isFirst) { - isFirst = false - conn.push(1.0) - 10 * 1000 - } else { - conn.close() - Long.MAX_VALUE - } - } - }) - - forwarder.close() - source.cancel() - } - - @Test - fun testState() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - conn.close() - return Long.MAX_VALUE - } - } - - assertFalse(forwarder.isActive) - - forwarder.startConsumer(consumer) - assertTrue(forwarder.isActive) - - assertThrows<IllegalStateException> { forwarder.startConsumer(consumer) } - - forwarder.cancel() - assertFalse(forwarder.isActive) - - forwarder.close() - assertFalse(forwarder.isActive) - } - - @Test - fun testCancelPendingDelegate() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - - val consumer = spyk(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - conn.close() - return Long.MAX_VALUE - } - }) - - forwarder.startConsumer(consumer) - forwarder.cancel() - - verify(exactly = 0) { consumer.onStop(any(), any()) } - } - - @Test - fun testCancelStartedDelegate() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val source = FlowSink(engine, 2000.0) - - val consumer = spyk(FixedFlowSource(2000.0, 1.0)) - - source.startConsumer(forwarder) - yield() - forwarder.startConsumer(consumer) - yield() - forwarder.cancel() - - verify(exactly = 1) { consumer.onStart(any(), any()) } - verify(exactly = 1) { consumer.onStop(any(), any()) } - } - - @Test - fun testCancelPropagation() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val source = FlowSink(engine, 2000.0) - - val consumer = spyk(FixedFlowSource(2000.0, 1.0)) - - source.startConsumer(forwarder) - yield() - forwarder.startConsumer(consumer) - yield() - source.cancel() - - verify(exactly = 1) { consumer.onStart(any(), any()) } - verify(exactly = 1) { consumer.onStop(any(), any()) } - } - - @Test - fun testExitPropagation() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine, isCoupled = true) - val source = FlowSink(engine, 2000.0) - - val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - conn.close() - return Long.MAX_VALUE - } - } - - source.startConsumer(forwarder) - forwarder.consume(consumer) - yield() - - assertFalse(forwarder.isActive) - } - - @Test - @Disabled // Due to Kotlin bug: https://github.com/mockk/mockk/issues/368 - fun testAdjustCapacity() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val sink = FlowSink(engine, 1.0) - - val source = spyk(FixedFlowSource(2.0, 1.0)) - sink.startConsumer(forwarder) - - coroutineScope { - launch { forwarder.consume(source) } - delay(1000) - sink.capacity = 0.5 - } - - assertEquals(3000, clock.millis()) - verify(exactly = 1) { source.onPull(any(), any()) } - } - - @Test - fun testCounters() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val source = FlowSink(engine, 1.0) - - val consumer = FixedFlowSource(2.0, 1.0) - source.startConsumer(forwarder) - - forwarder.consume(consumer) - - yield() - - assertAll( - { assertEquals(2.0, source.counters.actual) }, - { assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } }, - { assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } }, - { assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" } }, - { assertEquals(2000, clock.millis()) } - ) - } - - @Test - fun testCoupledExit() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine, isCoupled = true) - val source = FlowSink(engine, 2000.0) - - launch { source.consume(forwarder) } - - forwarder.consume(FixedFlowSource(2000.0, 1.0)) - - yield() - - assertFalse(source.isActive) - } - - @Test - fun testPullFailureCoupled() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine, isCoupled = true) - val source = FlowSink(engine, 2000.0) - - launch { source.consume(forwarder) } - - try { - forwarder.consume(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - throw IllegalStateException("Test") - } - }) - } catch (cause: Throwable) { - // Ignore - } - - yield() - - assertFalse(source.isActive) - } - - @Test - fun testStartFailure() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val source = FlowSink(engine, 2000.0) - - launch { source.consume(forwarder) } - - try { - forwarder.consume(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - return Long.MAX_VALUE - } - - override fun onStart(conn: FlowConnection, now: Long) { - throw IllegalStateException("Test") - } - }) - } catch (cause: Throwable) { - // Ignore - } - - yield() - - assertTrue(source.isActive) - source.cancel() - } - - @Test - fun testConvergeFailure() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val source = FlowSink(engine, 2000.0) - - launch { source.consume(forwarder) } - - try { - forwarder.consume(object : FlowSource { - override fun onStart(conn: FlowConnection, now: Long) { - conn.shouldSourceConverge = true - } - - override fun onPull(conn: FlowConnection, now: Long): Long { - return Long.MAX_VALUE - } - - override fun onConverge(conn: FlowConnection, now: Long) { - throw IllegalStateException("Test") - } - }) - } catch (cause: Throwable) { - // Ignore - } - - yield() - - assertTrue(source.isActive) - source.cancel() - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt deleted file mode 100644 index 746d752d..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -import io.mockk.spyk -import io.mockk.verify -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.yield -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.flow.internal.FlowEngineImpl -import org.opendc.simulator.flow.source.FixedFlowSource -import org.opendc.simulator.flow.source.FlowSourceRateAdapter -import org.opendc.simulator.kotlin.runSimulation - -/** - * A test suite for the [FlowSink] class. - */ -internal class FlowSinkTest { - @Test - fun testSpeed() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - - val consumer = FixedFlowSource(4200.0, 1.0) - - val res = mutableListOf<Double>() - val adapter = FlowSourceRateAdapter(consumer, res::add) - - provider.consume(adapter) - - assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } - } - - @Test - fun testAdjustCapacity() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val provider = FlowSink(engine, 1.0) - - val consumer = spyk(FixedFlowSource(2.0, 1.0)) - - coroutineScope { - launch { provider.consume(consumer) } - delay(1000) - provider.capacity = 0.5 - } - assertEquals(3000, clock.millis()) - verify(exactly = 3) { consumer.onPull(any(), any()) } - } - - @Test - fun testSpeedLimit() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - - val consumer = FixedFlowSource(capacity, 2.0) - - val res = mutableListOf<Double>() - val adapter = FlowSourceRateAdapter(consumer, res::add) - - provider.consume(adapter) - - assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } - } - - /** - * Test to see whether no infinite recursion occurs when interrupting during [FlowSource.onStart] or - * [FlowSource.onPull]. - */ - @Test - fun testIntermediateInterrupt() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - - val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - conn.close() - return Long.MAX_VALUE - } - - override fun onStart(conn: FlowConnection, now: Long) { - conn.pull() - } - } - - provider.consume(consumer) - } - - @Test - fun testInterrupt() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - lateinit var resCtx: FlowConnection - - val consumer = object : FlowSource { - var isFirst = true - - override fun onStart(conn: FlowConnection, now: Long) { - resCtx = conn - } - - override fun onPull(conn: FlowConnection, now: Long): Long { - return if (isFirst) { - isFirst = false - conn.push(1.0) - 4000 - } else { - conn.close() - Long.MAX_VALUE - } - } - } - - launch { - yield() - resCtx.pull() - } - provider.consume(consumer) - - assertEquals(0, clock.millis()) - } - - @Test - fun testFailure() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - - val consumer = object : FlowSource { - override fun onStart(conn: FlowConnection, now: Long) { - throw IllegalStateException("Hi") - } - - override fun onPull(conn: FlowConnection, now: Long): Long { - return Long.MAX_VALUE - } - } - - assertThrows<IllegalStateException> { - provider.consume(consumer) - } - } - - @Test - fun testExceptionPropagationOnNext() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - - val consumer = object : FlowSource { - var isFirst = true - - override fun onPull(conn: FlowConnection, now: Long): Long { - return if (isFirst) { - isFirst = false - conn.push(1.0) - 1000 - } else { - throw IllegalStateException() - } - } - } - - assertThrows<IllegalStateException> { - provider.consume(consumer) - } - } - - @Test - fun testConcurrentConsumption() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - - val consumer = FixedFlowSource(capacity, 1.0) - - assertThrows<IllegalStateException> { - coroutineScope { - launch { provider.consume(consumer) } - provider.consume(consumer) - } - } - } - - @Test - fun testCancelDuringConsumption() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - - val consumer = FixedFlowSource(capacity, 1.0) - - launch { provider.consume(consumer) } - delay(500) - provider.cancel() - - yield() - - assertEquals(500, clock.millis()) - } - - @Test - fun testInfiniteSleep() { - assertThrows<IllegalStateException> { - runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - - val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long = Long.MAX_VALUE - } - - provider.consume(consumer) - } - } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt deleted file mode 100644 index 2409e174..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.mux - -import kotlinx.coroutines.yield -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.flow.FlowConnection -import org.opendc.simulator.flow.FlowForwarder -import org.opendc.simulator.flow.FlowSink -import org.opendc.simulator.flow.FlowSource -import org.opendc.simulator.flow.consume -import org.opendc.simulator.flow.internal.FlowEngineImpl -import org.opendc.simulator.flow.source.FixedFlowSource -import org.opendc.simulator.flow.source.FlowSourceRateAdapter -import org.opendc.simulator.flow.source.TraceFlowSource -import org.opendc.simulator.kotlin.runSimulation - -/** - * Test suite for the [ForwardingFlowMultiplexer] class. - */ -internal class ForwardingFlowMultiplexerTest { - /** - * Test a trace workload. - */ - @Test - fun testTrace() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val speed = mutableListOf<Double>() - - val duration = 5 * 60L - val workload = - TraceFlowSource( - sequenceOf( - TraceFlowSource.Fragment(duration * 1000, 28.0), - TraceFlowSource.Fragment(duration * 1000, 3500.0), - TraceFlowSource.Fragment(duration * 1000, 0.0), - TraceFlowSource.Fragment(duration * 1000, 183.0) - ) - ) - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - val forwarder = FlowForwarder(engine) - val adapter = FlowSourceRateAdapter(forwarder, speed::add) - source.startConsumer(adapter) - forwarder.startConsumer(switch.newOutput()) - - val provider = switch.newInput() - provider.consume(workload) - yield() - - assertAll( - { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } }, - { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } } - ) - } - - /** - * Test runtime workload on hypervisor. - */ - @Test - fun testRuntimeWorkload() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val duration = 5 * 60L * 1000 - val workload = FixedFlowSource(duration * 3.2, 1.0) - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - - source.startConsumer(switch.newOutput()) - - val provider = switch.newInput() - provider.consume(workload) - yield() - - assertEquals(duration, clock.millis()) { "Took enough time" } - } - - /** - * Test two workloads running sequentially. - */ - @Test - fun testTwoWorkloads() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val duration = 5 * 60L * 1000 - val workload = object : FlowSource { - var isFirst = true - - override fun onStart(conn: FlowConnection, now: Long) { - isFirst = true - } - - override fun onPull(conn: FlowConnection, now: Long): Long { - return if (isFirst) { - isFirst = false - conn.push(1.0) - duration - } else { - conn.close() - Long.MAX_VALUE - } - } - } - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - - source.startConsumer(switch.newOutput()) - - val provider = switch.newInput() - provider.consume(workload) - yield() - provider.consume(workload) - assertEquals(duration * 2, clock.millis()) { "Took enough time" } - } - - /** - * Test concurrent workloads on the machine. - */ - @Test - fun testConcurrentWorkloadFails() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - - source.startConsumer(switch.newOutput()) - - switch.newInput() - assertThrows<IllegalStateException> { switch.newInput() } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt deleted file mode 100644 index a6bf8ad8..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.mux - -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.launch -import kotlinx.coroutines.yield -import org.junit.jupiter.api.Assertions.assertAll -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.opendc.simulator.flow.FlowSink -import org.opendc.simulator.flow.consume -import org.opendc.simulator.flow.internal.FlowEngineImpl -import org.opendc.simulator.flow.source.FixedFlowSource -import org.opendc.simulator.flow.source.TraceFlowSource -import org.opendc.simulator.kotlin.runSimulation - -/** - * Test suite for the [FlowMultiplexer] implementations - */ -internal class MaxMinFlowMultiplexerTest { - @Test - fun testSmoke() = runSimulation { - val scheduler = FlowEngineImpl(coroutineContext, clock) - val switch = MaxMinFlowMultiplexer(scheduler) - - val sources = List(2) { FlowSink(scheduler, 2000.0) } - sources.forEach { it.startConsumer(switch.newOutput()) } - - val provider = switch.newInput() - val consumer = FixedFlowSource(2000.0, 1.0) - - try { - provider.consume(consumer) - yield() - } finally { - switch.clear() - } - } - - /** - * Test overcommitting of resources via the hypervisor with a single VM. - */ - @Test - fun testOvercommittedSingle() = runSimulation { - val scheduler = FlowEngineImpl(coroutineContext, clock) - - val duration = 5 * 60L - val workload = - TraceFlowSource( - sequenceOf( - TraceFlowSource.Fragment(duration * 1000, 28.0), - TraceFlowSource.Fragment(duration * 1000, 3500.0), - TraceFlowSource.Fragment(duration * 1000, 0.0), - TraceFlowSource.Fragment(duration * 1000, 183.0) - ) - ) - - val switch = MaxMinFlowMultiplexer(scheduler) - val sink = FlowSink(scheduler, 3200.0) - val provider = switch.newInput() - - try { - sink.startConsumer(switch.newOutput()) - provider.consume(workload) - yield() - } finally { - switch.clear() - } - - assertAll( - { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") }, - { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") }, - { assertEquals(2816700.0, switch.counters.remaining, "Remaining capacity does not match") }, - { assertEquals(1200000, clock.millis()) } - ) - } - - /** - * Test overcommitting of resources via the hypervisor with two VMs. - */ - @Test - fun testOvercommittedDual() = runSimulation { - val scheduler = FlowEngineImpl(coroutineContext, clock) - - val duration = 5 * 60L - val workloadA = - TraceFlowSource( - sequenceOf( - TraceFlowSource.Fragment(duration * 1000, 28.0), - TraceFlowSource.Fragment(duration * 1000, 3500.0), - TraceFlowSource.Fragment(duration * 1000, 0.0), - TraceFlowSource.Fragment(duration * 1000, 183.0) - ) - ) - val workloadB = - TraceFlowSource( - sequenceOf( - TraceFlowSource.Fragment(duration * 1000, 28.0), - TraceFlowSource.Fragment(duration * 1000, 3100.0), - TraceFlowSource.Fragment(duration * 1000, 0.0), - TraceFlowSource.Fragment(duration * 1000, 73.0) - ) - ) - - val switch = MaxMinFlowMultiplexer(scheduler) - val sink = FlowSink(scheduler, 3200.0) - val providerA = switch.newInput() - val providerB = switch.newInput() - - try { - sink.startConsumer(switch.newOutput()) - - coroutineScope { - launch { providerA.consume(workloadA) } - providerB.consume(workloadB) - } - - yield() - } finally { - switch.clear() - } - assertAll( - { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") }, - { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") }, - { assertEquals(2786400.0, switch.counters.remaining, "Remaining capacity does not match") }, - { assertEquals(1200000, clock.millis()) } - ) - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt new file mode 100644 index 00000000..839835ce --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt @@ -0,0 +1,197 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2 + +import io.mockk.mockk +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.flow2.mux.MaxMinFlowMultiplexer +import org.opendc.simulator.flow2.sink.SimpleFlowSink +import org.opendc.simulator.flow2.source.SimpleFlowSource +import org.opendc.simulator.kotlin.runSimulation + +/** + * Smoke tests for the Flow API. + */ +class FlowEngineTest { + @Test + fun testSmoke() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val multiplexer = MaxMinFlowMultiplexer(graph) + val sink = SimpleFlowSink(graph, 2.0f) + + graph.connect(multiplexer.newOutput(), sink.input) + + val sourceA = SimpleFlowSource(graph, 2000.0f, 0.8f) + val sourceB = SimpleFlowSource(graph, 2000.0f, 0.8f) + + graph.connect(sourceA.output, multiplexer.newInput()) + graph.connect(sourceB.output, multiplexer.newInput()) + } + + @Test + fun testConnectInvalidInlet() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val inlet = mockk<Inlet>() + val source = SimpleFlowSource(graph, 2000.0f, 0.8f) + assertThrows<IllegalArgumentException> { graph.connect(source.output, inlet) } + } + + @Test + fun testConnectInvalidOutlet() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val outlet = mockk<Outlet>() + val sink = SimpleFlowSink(graph, 2.0f) + assertThrows<IllegalArgumentException> { graph.connect(outlet, sink.input) } + } + + @Test + fun testConnectInletBelongsToDifferentGraph() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graphA = engine.newGraph() + val graphB = engine.newGraph() + + val sink = SimpleFlowSink(graphB, 2.0f) + val source = SimpleFlowSource(graphA, 2000.0f, 0.8f) + + assertThrows<IllegalArgumentException> { graphA.connect(source.output, sink.input) } + } + + @Test + fun testConnectOutletBelongsToDifferentGraph() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graphA = engine.newGraph() + val graphB = engine.newGraph() + + val sink = SimpleFlowSink(graphA, 2.0f) + val source = SimpleFlowSource(graphB, 2000.0f, 0.8f) + + assertThrows<IllegalArgumentException> { graphA.connect(source.output, sink.input) } + } + + @Test + fun testConnectInletAlreadyConnected() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val sink = SimpleFlowSink(graph, 2.0f) + val sourceA = SimpleFlowSource(graph, 2000.0f, 0.8f) + val sourceB = SimpleFlowSource(graph, 2000.0f, 0.8f) + + graph.connect(sourceA.output, sink.input) + assertThrows<IllegalStateException> { graph.connect(sourceB.output, sink.input) } + } + + @Test + fun testConnectOutletAlreadyConnected() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val sinkA = SimpleFlowSink(graph, 2.0f) + val sinkB = SimpleFlowSink(graph, 2.0f) + val source = SimpleFlowSource(graph, 2000.0f, 0.8f) + + graph.connect(source.output, sinkA.input) + assertThrows<IllegalStateException> { graph.connect(source.output, sinkB.input) } + } + + @Test + fun testDisconnectInletInvalid() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val inlet = mockk<Inlet>() + assertThrows<IllegalArgumentException> { graph.disconnect(inlet) } + } + + @Test + fun testDisconnectOutletInvalid() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val outlet = mockk<Outlet>() + assertThrows<IllegalArgumentException> { graph.disconnect(outlet) } + } + + @Test + fun testDisconnectInletInvalidGraph() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graphA = engine.newGraph() + val graphB = engine.newGraph() + + val sink = SimpleFlowSink(graphA, 2.0f) + + assertThrows<IllegalArgumentException> { graphB.disconnect(sink.input) } + } + + @Test + fun testDisconnectOutletInvalidGraph() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graphA = engine.newGraph() + val graphB = engine.newGraph() + + val source = SimpleFlowSource(graphA, 2000.0f, 0.8f) + + assertThrows<IllegalArgumentException> { graphB.disconnect(source.output) } + } + + @Test + fun testInletEquality() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val sinkA = SimpleFlowSink(graph, 2.0f) + val sinkB = SimpleFlowSink(graph, 2.0f) + + val multiplexer = MaxMinFlowMultiplexer(graph) + + assertEquals(sinkA.input, sinkA.input) + assertNotEquals(sinkA.input, sinkB.input) + + assertNotEquals(multiplexer.newInput(), multiplexer.newInput()) + } + + @Test + fun testOutletEquality() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val sourceA = SimpleFlowSource(graph, 2000.0f, 0.8f) + val sourceB = SimpleFlowSource(graph, 2000.0f, 0.8f) + + val multiplexer = MaxMinFlowMultiplexer(graph) + + assertEquals(sourceA.output, sourceA.output) + assertNotEquals(sourceA.output, sourceB.output) + + assertNotEquals(multiplexer.newOutput(), multiplexer.newOutput()) + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowTimerQueueTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowTimerQueueTest.kt new file mode 100644 index 00000000..1824959c --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowTimerQueueTest.kt @@ -0,0 +1,385 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2 + +import io.mockk.mockk +import org.junit.jupiter.api.Assertions.assertAll +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +/** + * Test suite for the [FlowTimerQueue] class. + */ +class FlowTimerQueueTest { + private lateinit var queue: FlowTimerQueue + + @BeforeEach + fun setUp() { + queue = FlowTimerQueue(3) + } + + /** + * Test whether a call to [FlowTimerQueue.poll] returns `null` for an empty queue. + */ + @Test + fun testPollEmpty() { + assertAll( + { assertEquals(Long.MAX_VALUE, queue.peekDeadline()) }, + { assertNull(queue.poll(100L)) } + ) + } + + /** + * Test whether a call to [FlowTimerQueue.poll] returns the proper value for a queue with a single entry. + */ + @Test + fun testSingleEntry() { + val entry = mockk<FlowStage>() + entry.deadline = 100 + entry.timerIndex = -1 + + queue.enqueue(entry) + + assertAll( + { assertEquals(100, queue.peekDeadline()) }, + { assertNull(queue.poll(10L)) }, + { assertEquals(entry, queue.poll(200L)) }, + { assertNull(queue.poll(200L)) } + ) + } + + /** + * Test whether [FlowTimerQueue.poll] returns values in the queue in the proper order. + */ + @Test + fun testMultipleEntries() { + val entryA = mockk<FlowStage>() + entryA.deadline = 100 + entryA.timerIndex = -1 + + queue.enqueue(entryA) + + val entryB = mockk<FlowStage>() + entryB.deadline = 10 + entryB.timerIndex = -1 + + queue.enqueue(entryB) + + val entryC = mockk<FlowStage>() + entryC.deadline = 58 + entryC.timerIndex = -1 + + queue.enqueue(entryC) + + assertAll( + { assertEquals(10, queue.peekDeadline()) }, + { assertEquals(entryB, queue.poll(100L)) }, + { assertEquals(entryC, queue.poll(100L)) }, + { assertEquals(entryA, queue.poll(100L)) }, + { assertNull(queue.poll(100L)) } + ) + } + + /** + * Test that the queue is properly resized when the number of entries exceed the capacity. + */ + @Test + fun testResize() { + val entryA = mockk<FlowStage>() + entryA.deadline = 100 + entryA.timerIndex = -1 + + queue.enqueue(entryA) + + val entryB = mockk<FlowStage>() + entryB.deadline = 20 + entryB.timerIndex = -1 + + queue.enqueue(entryB) + + val entryC = mockk<FlowStage>() + entryC.deadline = 58 + entryC.timerIndex = -1 + + queue.enqueue(entryC) + + val entryD = mockk<FlowStage>() + entryD.deadline = 31 + entryD.timerIndex = -1 + + queue.enqueue(entryD) + + assertAll( + { assertEquals(20, queue.peekDeadline()) }, + { assertEquals(entryB, queue.poll(100L)) }, + { assertEquals(entryD, queue.poll(100L)) }, + { assertEquals(entryC, queue.poll(100L)) }, + { assertEquals(entryA, queue.poll(100L)) }, + { assertNull(queue.poll(100L)) } + ) + } + + /** + * Test to verify that we can change the deadline of the last element in the queue. + */ + @Test + fun testChangeDeadlineTail() { + val entryA = mockk<FlowStage>() + entryA.deadline = 100 + entryA.timerIndex = -1 + + queue.enqueue(entryA) + + val entryB = mockk<FlowStage>() + entryB.deadline = 20 + entryB.timerIndex = -1 + + queue.enqueue(entryB) + + val entryC = mockk<FlowStage>() + entryC.deadline = 58 + entryC.timerIndex = -1 + + queue.enqueue(entryC) + + entryA.deadline = 10 + queue.enqueue(entryA) + + assertAll( + { assertEquals(10, queue.peekDeadline()) }, + { assertEquals(entryA, queue.poll(100L)) }, + { assertEquals(entryB, queue.poll(100L)) }, + { assertEquals(entryC, queue.poll(100L)) }, + { assertNull(queue.poll(100L)) } + ) + } + + /** + * Test that we can change the deadline of the head entry in the queue. + */ + @Test + fun testChangeDeadlineMiddle() { + val entryA = mockk<FlowStage>() + entryA.deadline = 100 + entryA.timerIndex = -1 + + queue.enqueue(entryA) + + val entryB = mockk<FlowStage>() + entryB.deadline = 20 + entryB.timerIndex = -1 + + queue.enqueue(entryB) + + val entryC = mockk<FlowStage>() + entryC.deadline = 58 + entryC.timerIndex = -1 + + queue.enqueue(entryC) + + entryC.deadline = 10 + queue.enqueue(entryC) + + assertAll( + { assertEquals(10, queue.peekDeadline()) }, + { assertEquals(entryC, queue.poll(100L)) }, + { assertEquals(entryB, queue.poll(100L)) }, + { assertEquals(entryA, queue.poll(100L)) }, + { assertNull(queue.poll(100L)) } + ) + } + + /** + * Test that we can change the deadline of the head entry in the queue. + */ + @Test + fun testChangeDeadlineHead() { + val entryA = mockk<FlowStage>() + entryA.deadline = 100 + entryA.timerIndex = -1 + + queue.enqueue(entryA) + + val entryB = mockk<FlowStage>() + entryB.deadline = 20 + entryB.timerIndex = -1 + + queue.enqueue(entryB) + + val entryC = mockk<FlowStage>() + entryC.deadline = 58 + entryC.timerIndex = -1 + + queue.enqueue(entryC) + + entryB.deadline = 30 + queue.enqueue(entryB) + + assertAll( + { assertEquals(30, queue.peekDeadline()) }, + { assertEquals(entryB, queue.poll(100L)) }, + { assertEquals(entryC, queue.poll(100L)) }, + { assertEquals(entryA, queue.poll(100L)) }, + { assertNull(queue.poll(100L)) } + ) + } + + /** + * Test that an unchanged deadline results in a no-op. + */ + @Test + fun testChangeDeadlineNop() { + val entryA = mockk<FlowStage>() + entryA.deadline = 100 + entryA.timerIndex = -1 + + queue.enqueue(entryA) + + val entryB = mockk<FlowStage>() + entryB.deadline = 20 + entryB.timerIndex = -1 + + queue.enqueue(entryB) + + val entryC = mockk<FlowStage>() + entryC.deadline = 58 + entryC.timerIndex = -1 + + queue.enqueue(entryC) + + // Should be a no-op + queue.enqueue(entryA) + + assertAll( + { assertEquals(20, queue.peekDeadline()) }, + { assertEquals(entryB, queue.poll(100L)) }, + { assertEquals(entryC, queue.poll(100L)) }, + { assertEquals(entryA, queue.poll(100L)) }, + { assertNull(queue.poll(100L)) } + ) + } + + /** + * Test that we can remove an entry from the end of the queue. + */ + @Test + fun testRemoveEntryTail() { + val entryA = mockk<FlowStage>() + entryA.deadline = 100 + entryA.timerIndex = -1 + + queue.enqueue(entryA) + + val entryB = mockk<FlowStage>() + entryB.deadline = 20 + entryB.timerIndex = -1 + + queue.enqueue(entryB) + + val entryC = mockk<FlowStage>() + entryC.deadline = 58 + entryC.timerIndex = -1 + + queue.enqueue(entryC) + + entryC.deadline = Long.MAX_VALUE + queue.enqueue(entryC) + + assertAll( + { assertEquals(20, queue.peekDeadline()) }, + { assertEquals(entryB, queue.poll(100L)) }, + { assertEquals(entryA, queue.poll(100L)) }, + { assertNull(queue.poll(100L)) } + ) + } + + /** + * Test that we can remove an entry from the head of the queue. + */ + @Test + fun testRemoveEntryHead() { + val entryA = mockk<FlowStage>() + entryA.deadline = 100 + entryA.timerIndex = -1 + + queue.enqueue(entryA) + + val entryB = mockk<FlowStage>() + entryB.deadline = 20 + entryB.timerIndex = -1 + + queue.enqueue(entryB) + + val entryC = mockk<FlowStage>() + entryC.deadline = 58 + entryC.timerIndex = -1 + + queue.enqueue(entryC) + + entryB.deadline = Long.MAX_VALUE + queue.enqueue(entryB) + + assertAll( + { assertEquals(58, queue.peekDeadline()) }, + { assertEquals(entryC, queue.poll(100L)) }, + { assertEquals(entryA, queue.poll(100L)) }, + { assertNull(queue.poll(100L)) } + ) + } + + /** + * Test that we can remove an entry from the middle of a queue. + */ + @Test + fun testRemoveEntryMiddle() { + val entryA = mockk<FlowStage>() + entryA.deadline = 100 + entryA.timerIndex = -1 + + queue.enqueue(entryA) + + val entryB = mockk<FlowStage>() + entryB.deadline = 20 + entryB.timerIndex = -1 + + queue.enqueue(entryB) + + val entryC = mockk<FlowStage>() + entryC.deadline = 58 + entryC.timerIndex = -1 + + queue.enqueue(entryC) + + entryC.deadline = Long.MAX_VALUE + queue.enqueue(entryC) + + assertAll( + { assertEquals(20, queue.peekDeadline()) }, + { assertEquals(entryB, queue.poll(100L)) }, + { assertEquals(entryA, queue.poll(100L)) }, + { assertNull(queue.poll(100L)) } + ) + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/InvocationStackTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/InvocationStackTest.kt new file mode 100644 index 00000000..2250fe87 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/InvocationStackTest.kt @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2 + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test + +/** + * Test suite for the [InvocationStack] class. + */ +class InvocationStackTest { + private val stack = InvocationStack(2) + + @Test + fun testPollEmpty() { + assertEquals(Long.MAX_VALUE, stack.poll()) + } + + @Test + fun testAddSingle() { + assertTrue(stack.tryAdd(10)) + assertEquals(10, stack.poll()) + } + + @Test + fun testAddLater() { + assertTrue(stack.tryAdd(10)) + assertFalse(stack.tryAdd(15)) + assertEquals(10, stack.poll()) + } + + @Test + fun testAddEarlier() { + assertTrue(stack.tryAdd(10)) + assertTrue(stack.tryAdd(5)) + assertEquals(5, stack.poll()) + assertEquals(10, stack.poll()) + } + + @Test + fun testCapacityExceeded() { + assertTrue(stack.tryAdd(10)) + assertTrue(stack.tryAdd(5)) + assertTrue(stack.tryAdd(2)) + assertEquals(2, stack.poll()) + assertEquals(5, stack.poll()) + assertEquals(10, stack.poll()) + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt new file mode 100644 index 00000000..a2ed2195 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.mux + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.opendc.simulator.flow2.FlowEngine +import org.opendc.simulator.flow2.sink.SimpleFlowSink +import org.opendc.simulator.flow2.source.TraceFlowSource +import org.opendc.simulator.kotlin.runSimulation + +/** + * Test suite for the [ForwardingFlowMultiplexer] class. + */ +class ForwardingFlowMultiplexerTest { + /** + * Test a trace workload. + */ + @Test + fun testTrace() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val switch = ForwardingFlowMultiplexer(graph) + val sink = SimpleFlowSink(graph, 3200.0f) + graph.connect(switch.newOutput(), sink.input) + + val workload = + TraceFlowSource( + graph, + TraceFlowSource.Trace( + longArrayOf(1000, 2000, 3000, 4000), + floatArrayOf(28.0f, 3500.0f, 0.0f, 183.0f), + 4 + ) + ) + graph.connect(workload.output, switch.newInput()) + + advanceUntilIdle() + + assertAll( + { assertEquals(4000, clock.millis()) { "Took enough time" } } + ) + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt index 552579ff..ba339ee3 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 AtLarge Research + * Copyright (c) 2022 AtLarge Research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -20,38 +20,35 @@ * SOFTWARE. */ -package org.opendc.simulator.flow.source +package org.opendc.simulator.flow2.mux import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test -import org.opendc.simulator.flow.FlowSink -import org.opendc.simulator.flow.consume -import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow2.FlowEngine +import org.opendc.simulator.flow2.sink.SimpleFlowSink +import org.opendc.simulator.flow2.source.SimpleFlowSource import org.opendc.simulator.kotlin.runSimulation /** - * A test suite for the [FixedFlowSource] class. + * Test suite for the [MaxMinFlowMultiplexer] class. */ -internal class FixedFlowSourceTest { +class MaxMinFlowMultiplexerTest { @Test fun testSmoke() = runSimulation { - val scheduler = FlowEngineImpl(coroutineContext, clock) - val provider = FlowSink(scheduler, 1.0) + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + val switch = MaxMinFlowMultiplexer(graph) - val consumer = FixedFlowSource(1.0, 1.0) + val sinks = List(2) { SimpleFlowSink(graph, 2000.0f) } + for (source in sinks) { + graph.connect(switch.newOutput(), source.input) + } - provider.consume(consumer) - assertEquals(1000, clock.millis()) - } - - @Test - fun testUtilization() = runSimulation { - val scheduler = FlowEngineImpl(coroutineContext, clock) - val provider = FlowSink(scheduler, 1.0) + val source = SimpleFlowSource(graph, 2000.0f, 1.0f) + graph.connect(source.output, switch.newInput()) - val consumer = FixedFlowSource(1.0, 0.5) + advanceUntilIdle() - provider.consume(consumer) - assertEquals(2000, clock.millis()) + assertEquals(500, clock.millis()) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt new file mode 100644 index 00000000..a75efba3 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2022 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow2.sink + +import kotlinx.coroutines.delay +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.opendc.simulator.flow2.FlowEngine +import org.opendc.simulator.flow2.source.SimpleFlowSource +import org.opendc.simulator.flow2.source.TraceFlowSource +import org.opendc.simulator.kotlin.runSimulation +import java.util.concurrent.ThreadLocalRandom + +/** + * Test suite for the [SimpleFlowSink] class. + */ +class FlowSinkTest { + @Test + fun testSmoke() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val sink = SimpleFlowSink(graph, 1.0f) + val source = SimpleFlowSource(graph, 2.0f, 1.0f) + + graph.connect(source.output, sink.input) + advanceUntilIdle() + + assertEquals(2000, clock.millis()) + } + + @Test + fun testAdjustCapacity() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val sink = SimpleFlowSink(graph, 1.0f) + val source = SimpleFlowSource(graph, 2.0f, 1.0f) + + graph.connect(source.output, sink.input) + + delay(1000) + sink.capacity = 0.5f + + advanceUntilIdle() + + assertEquals(3000, clock.millis()) + } + + @Test + fun testUtilization() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val sink = SimpleFlowSink(graph, 1.0f) + val source = SimpleFlowSource(graph, 2.0f, 0.5f) + + graph.connect(source.output, sink.input) + advanceUntilIdle() + + assertEquals(4000, clock.millis()) + } + + @Test + fun testFragments() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val sink = SimpleFlowSink(graph, 1.0f) + val trace = TraceFlowSource.Trace( + longArrayOf(1000, 2000, 3000, 4000), + floatArrayOf(1.0f, 0.5f, 2.0f, 1.0f), + 4 + ) + val source = TraceFlowSource( + graph, + trace + ) + + graph.connect(source.output, sink.input) + advanceUntilIdle() + + assertEquals(4000, clock.millis()) + } + + @Test + fun benchmarkSink() { + val random = ThreadLocalRandom.current() + val traceSize = 10000000 + val trace = TraceFlowSource.Trace( + LongArray(traceSize) { it * 1000L }, + FloatArray(traceSize) { random.nextDouble(0.0, 4500.0).toFloat() }, + traceSize + ) + + return runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + val sink = SimpleFlowSink(graph, 4200.0f) + val source = TraceFlowSource(graph, trace) + graph.connect(source.output, sink.input) + } + } +} |
