diff options
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src')
9 files changed, 48 insertions, 52 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt index fb112082..59dd3bad 100644 --- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt @@ -60,7 +60,7 @@ class FlowBenchmarks { @Benchmark fun benchmarkSink() { return runSimulation { - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sink = SimpleFlowSink(graph, 4200.0f) val source = TraceFlowSource(graph, trace) @@ -71,7 +71,7 @@ class FlowBenchmarks { @Benchmark fun benchmarkForward() { return runSimulation { - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sink = SimpleFlowSink(graph, 4200.0f) val source = TraceFlowSource(graph, trace) @@ -85,7 +85,7 @@ class FlowBenchmarks { @Benchmark fun benchmarkMuxMaxMinSingleSource() { return runSimulation { - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val switch = MaxMinFlowMultiplexer(graph) @@ -103,7 +103,7 @@ class FlowBenchmarks { @Benchmark fun benchmarkMuxMaxMinTripleSource() { return runSimulation { - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val switch = MaxMinFlowMultiplexer(graph) diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java index 0ebb0da9..c0f52505 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java @@ -23,11 +23,11 @@ package org.opendc.simulator.flow2; import java.time.Clock; +import java.time.InstantSource; import java.util.ArrayList; import java.util.List; -import kotlin.coroutines.ContinuationInterceptor; import kotlin.coroutines.CoroutineContext; -import kotlinx.coroutines.Delay; +import org.opendc.common.Dispatcher; /** * A {@link FlowEngine} simulates a generic flow network. @@ -56,29 +56,25 @@ public final class FlowEngine implements Runnable { */ private boolean active; - private final CoroutineContext coroutineContext; - private final Clock clock; - private final Delay delay; + private final Dispatcher dispatcher; + private final InstantSource clock; /** - * Create a new {@link FlowEngine} instance using the specified {@link CoroutineContext} and {@link Clock}. + * Create a new {@link FlowEngine} instance using the specified {@link CoroutineContext} and {@link InstantSource}. */ - public static FlowEngine create(CoroutineContext coroutineContext, Clock clock) { - return new FlowEngine(coroutineContext, clock); + public static FlowEngine create(Dispatcher dispatcher) { + return new FlowEngine(dispatcher); } - FlowEngine(CoroutineContext coroutineContext, Clock clock) { - this.coroutineContext = coroutineContext; - this.clock = clock; - - CoroutineContext.Key<? extends ContinuationInterceptor> key = ContinuationInterceptor.Key; - this.delay = (Delay) coroutineContext.get(key); + FlowEngine(Dispatcher dispatcher) { + this.dispatcher = dispatcher; + this.clock = dispatcher.getTimeSource(); } /** * Obtain the (virtual) {@link Clock} driving the simulation. */ - public Clock getClock() { + public InstantSource getClock() { return clock; } @@ -204,7 +200,7 @@ public final class FlowEngine implements Runnable { // Only schedule a new scheduler invocation in case the target is earlier than all other pending // scheduler invocations if (scheduled.tryAdd(target)) { - delay.invokeOnTimeout(target - now, this, coroutineContext); + dispatcher.schedule(target - now, this); } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java index ed5579ea..25f87e04 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java @@ -22,7 +22,7 @@ package org.opendc.simulator.flow2; -import java.time.Clock; +import java.time.InstantSource; import java.util.HashMap; import java.util.Map; import org.slf4j.Logger; @@ -67,7 +67,7 @@ public final class FlowStage { */ int timerIndex = -1; - final Clock clock; + final InstantSource clock; private final FlowStageLogic logic; final FlowGraphInternal parentGraph; private final FlowEngine engine; diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java index fba12aaf..16fed4eb 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java @@ -22,7 +22,7 @@ package org.opendc.simulator.flow2; -import java.time.Clock; +import java.time.InstantSource; import java.util.Objects; /** @@ -40,7 +40,7 @@ public final class InPort implements Inlet { OutPort output; private InHandler handler = InHandlers.noop(); - private final Clock clock; + private final InstantSource clock; private final String name; private final FlowStage stage; diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java index 332296a0..1f7ed4ee 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java @@ -22,7 +22,7 @@ package org.opendc.simulator.flow2; -import java.time.Clock; +import java.time.InstantSource; import java.util.Objects; /** @@ -42,7 +42,7 @@ public final class OutPort implements Outlet { private OutHandler handler = OutHandlers.noop(); private final String name; private final FlowStage stage; - private final Clock clock; + private final InstantSource clock; OutPort(FlowStage stage, String name, int id) { this.name = name; diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt index 839835ce..467bf334 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt @@ -38,7 +38,7 @@ import org.opendc.simulator.kotlin.runSimulation class FlowEngineTest { @Test fun testSmoke() = runSimulation { - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val multiplexer = MaxMinFlowMultiplexer(graph) @@ -55,7 +55,7 @@ class FlowEngineTest { @Test fun testConnectInvalidInlet() = runSimulation { - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val inlet = mockk<Inlet>() @@ -65,7 +65,7 @@ class FlowEngineTest { @Test fun testConnectInvalidOutlet() = runSimulation { - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val outlet = mockk<Outlet>() @@ -75,7 +75,7 @@ class FlowEngineTest { @Test fun testConnectInletBelongsToDifferentGraph() = runSimulation { - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graphA = engine.newGraph() val graphB = engine.newGraph() @@ -87,7 +87,7 @@ class FlowEngineTest { @Test fun testConnectOutletBelongsToDifferentGraph() = runSimulation { - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graphA = engine.newGraph() val graphB = engine.newGraph() @@ -99,7 +99,7 @@ class FlowEngineTest { @Test fun testConnectInletAlreadyConnected() = runSimulation { - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sink = SimpleFlowSink(graph, 2.0f) @@ -112,7 +112,7 @@ class FlowEngineTest { @Test fun testConnectOutletAlreadyConnected() = runSimulation { - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sinkA = SimpleFlowSink(graph, 2.0f) @@ -125,7 +125,7 @@ class FlowEngineTest { @Test fun testDisconnectInletInvalid() = runSimulation { - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val inlet = mockk<Inlet>() @@ -134,7 +134,7 @@ class FlowEngineTest { @Test fun testDisconnectOutletInvalid() = runSimulation { - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val outlet = mockk<Outlet>() @@ -143,7 +143,7 @@ class FlowEngineTest { @Test fun testDisconnectInletInvalidGraph() = runSimulation { - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graphA = engine.newGraph() val graphB = engine.newGraph() @@ -154,7 +154,7 @@ class FlowEngineTest { @Test fun testDisconnectOutletInvalidGraph() = runSimulation { - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graphA = engine.newGraph() val graphB = engine.newGraph() @@ -165,7 +165,7 @@ class FlowEngineTest { @Test fun testInletEquality() = runSimulation { - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sinkA = SimpleFlowSink(graph, 2.0f) @@ -181,7 +181,7 @@ class FlowEngineTest { @Test fun testOutletEquality() = runSimulation { - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sourceA = SimpleFlowSource(graph, 2000.0f, 0.8f) diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt index a2ed2195..fef49786 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt @@ -39,7 +39,7 @@ class ForwardingFlowMultiplexerTest { */ @Test fun testTrace() = runSimulation { - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val switch = ForwardingFlowMultiplexer(graph) @@ -60,7 +60,7 @@ class ForwardingFlowMultiplexerTest { advanceUntilIdle() assertAll( - { assertEquals(4000, clock.millis()) { "Took enough time" } } + { assertEquals(4000, timeSource.millis()) { "Took enough time" } } ) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt index ba339ee3..ebae2d4e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt @@ -35,7 +35,7 @@ import org.opendc.simulator.kotlin.runSimulation class MaxMinFlowMultiplexerTest { @Test fun testSmoke() = runSimulation { - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val switch = MaxMinFlowMultiplexer(graph) @@ -49,6 +49,6 @@ class MaxMinFlowMultiplexerTest { advanceUntilIdle() - assertEquals(500, clock.millis()) + assertEquals(500, timeSource.millis()) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt index a75efba3..ea516c63 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt @@ -37,7 +37,7 @@ import java.util.concurrent.ThreadLocalRandom class FlowSinkTest { @Test fun testSmoke() = runSimulation { - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sink = SimpleFlowSink(graph, 1.0f) @@ -46,12 +46,12 @@ class FlowSinkTest { graph.connect(source.output, sink.input) advanceUntilIdle() - assertEquals(2000, clock.millis()) + assertEquals(2000, timeSource.millis()) } @Test fun testAdjustCapacity() = runSimulation { - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sink = SimpleFlowSink(graph, 1.0f) @@ -64,12 +64,12 @@ class FlowSinkTest { advanceUntilIdle() - assertEquals(3000, clock.millis()) + assertEquals(3000, timeSource.millis()) } @Test fun testUtilization() = runSimulation { - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sink = SimpleFlowSink(graph, 1.0f) @@ -78,12 +78,12 @@ class FlowSinkTest { graph.connect(source.output, sink.input) advanceUntilIdle() - assertEquals(4000, clock.millis()) + assertEquals(4000, timeSource.millis()) } @Test fun testFragments() = runSimulation { - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sink = SimpleFlowSink(graph, 1.0f) @@ -100,7 +100,7 @@ class FlowSinkTest { graph.connect(source.output, sink.input) advanceUntilIdle() - assertEquals(4000, clock.millis()) + assertEquals(4000, timeSource.millis()) } @Test @@ -114,7 +114,7 @@ class FlowSinkTest { ) return runSimulation { - val engine = FlowEngine.create(coroutineContext, clock) + val engine = FlowEngine.create(dispatcher) val graph = engine.newGraph() val sink = SimpleFlowSink(graph, 4200.0f) val source = TraceFlowSource(graph, trace) |
