summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt8
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java28
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt26
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt18
9 files changed, 48 insertions, 52 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt
index fb112082..59dd3bad 100644
--- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow2/FlowBenchmarks.kt
@@ -60,7 +60,7 @@ class FlowBenchmarks {
@Benchmark
fun benchmarkSink() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 4200.0f)
val source = TraceFlowSource(graph, trace)
@@ -71,7 +71,7 @@ class FlowBenchmarks {
@Benchmark
fun benchmarkForward() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 4200.0f)
val source = TraceFlowSource(graph, trace)
@@ -85,7 +85,7 @@ class FlowBenchmarks {
@Benchmark
fun benchmarkMuxMaxMinSingleSource() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val switch = MaxMinFlowMultiplexer(graph)
@@ -103,7 +103,7 @@ class FlowBenchmarks {
@Benchmark
fun benchmarkMuxMaxMinTripleSource() {
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val switch = MaxMinFlowMultiplexer(graph)
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java
index 0ebb0da9..c0f52505 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowEngine.java
@@ -23,11 +23,11 @@
package org.opendc.simulator.flow2;
import java.time.Clock;
+import java.time.InstantSource;
import java.util.ArrayList;
import java.util.List;
-import kotlin.coroutines.ContinuationInterceptor;
import kotlin.coroutines.CoroutineContext;
-import kotlinx.coroutines.Delay;
+import org.opendc.common.Dispatcher;
/**
* A {@link FlowEngine} simulates a generic flow network.
@@ -56,29 +56,25 @@ public final class FlowEngine implements Runnable {
*/
private boolean active;
- private final CoroutineContext coroutineContext;
- private final Clock clock;
- private final Delay delay;
+ private final Dispatcher dispatcher;
+ private final InstantSource clock;
/**
- * Create a new {@link FlowEngine} instance using the specified {@link CoroutineContext} and {@link Clock}.
+ * Create a new {@link FlowEngine} instance using the specified {@link CoroutineContext} and {@link InstantSource}.
*/
- public static FlowEngine create(CoroutineContext coroutineContext, Clock clock) {
- return new FlowEngine(coroutineContext, clock);
+ public static FlowEngine create(Dispatcher dispatcher) {
+ return new FlowEngine(dispatcher);
}
- FlowEngine(CoroutineContext coroutineContext, Clock clock) {
- this.coroutineContext = coroutineContext;
- this.clock = clock;
-
- CoroutineContext.Key<? extends ContinuationInterceptor> key = ContinuationInterceptor.Key;
- this.delay = (Delay) coroutineContext.get(key);
+ FlowEngine(Dispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ this.clock = dispatcher.getTimeSource();
}
/**
* Obtain the (virtual) {@link Clock} driving the simulation.
*/
- public Clock getClock() {
+ public InstantSource getClock() {
return clock;
}
@@ -204,7 +200,7 @@ public final class FlowEngine implements Runnable {
// Only schedule a new scheduler invocation in case the target is earlier than all other pending
// scheduler invocations
if (scheduled.tryAdd(target)) {
- delay.invokeOnTimeout(target - now, this, coroutineContext);
+ dispatcher.schedule(target - now, this);
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java
index ed5579ea..25f87e04 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java
@@ -22,7 +22,7 @@
package org.opendc.simulator.flow2;
-import java.time.Clock;
+import java.time.InstantSource;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
@@ -67,7 +67,7 @@ public final class FlowStage {
*/
int timerIndex = -1;
- final Clock clock;
+ final InstantSource clock;
private final FlowStageLogic logic;
final FlowGraphInternal parentGraph;
private final FlowEngine engine;
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java
index fba12aaf..16fed4eb 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/InPort.java
@@ -22,7 +22,7 @@
package org.opendc.simulator.flow2;
-import java.time.Clock;
+import java.time.InstantSource;
import java.util.Objects;
/**
@@ -40,7 +40,7 @@ public final class InPort implements Inlet {
OutPort output;
private InHandler handler = InHandlers.noop();
- private final Clock clock;
+ private final InstantSource clock;
private final String name;
private final FlowStage stage;
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java
index 332296a0..1f7ed4ee 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java
+++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/OutPort.java
@@ -22,7 +22,7 @@
package org.opendc.simulator.flow2;
-import java.time.Clock;
+import java.time.InstantSource;
import java.util.Objects;
/**
@@ -42,7 +42,7 @@ public final class OutPort implements Outlet {
private OutHandler handler = OutHandlers.noop();
private final String name;
private final FlowStage stage;
- private final Clock clock;
+ private final InstantSource clock;
OutPort(FlowStage stage, String name, int id) {
this.name = name;
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt
index 839835ce..467bf334 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/FlowEngineTest.kt
@@ -38,7 +38,7 @@ import org.opendc.simulator.kotlin.runSimulation
class FlowEngineTest {
@Test
fun testSmoke() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val multiplexer = MaxMinFlowMultiplexer(graph)
@@ -55,7 +55,7 @@ class FlowEngineTest {
@Test
fun testConnectInvalidInlet() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val inlet = mockk<Inlet>()
@@ -65,7 +65,7 @@ class FlowEngineTest {
@Test
fun testConnectInvalidOutlet() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val outlet = mockk<Outlet>()
@@ -75,7 +75,7 @@ class FlowEngineTest {
@Test
fun testConnectInletBelongsToDifferentGraph() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graphA = engine.newGraph()
val graphB = engine.newGraph()
@@ -87,7 +87,7 @@ class FlowEngineTest {
@Test
fun testConnectOutletBelongsToDifferentGraph() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graphA = engine.newGraph()
val graphB = engine.newGraph()
@@ -99,7 +99,7 @@ class FlowEngineTest {
@Test
fun testConnectInletAlreadyConnected() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 2.0f)
@@ -112,7 +112,7 @@ class FlowEngineTest {
@Test
fun testConnectOutletAlreadyConnected() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sinkA = SimpleFlowSink(graph, 2.0f)
@@ -125,7 +125,7 @@ class FlowEngineTest {
@Test
fun testDisconnectInletInvalid() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val inlet = mockk<Inlet>()
@@ -134,7 +134,7 @@ class FlowEngineTest {
@Test
fun testDisconnectOutletInvalid() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val outlet = mockk<Outlet>()
@@ -143,7 +143,7 @@ class FlowEngineTest {
@Test
fun testDisconnectInletInvalidGraph() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graphA = engine.newGraph()
val graphB = engine.newGraph()
@@ -154,7 +154,7 @@ class FlowEngineTest {
@Test
fun testDisconnectOutletInvalidGraph() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graphA = engine.newGraph()
val graphB = engine.newGraph()
@@ -165,7 +165,7 @@ class FlowEngineTest {
@Test
fun testInletEquality() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sinkA = SimpleFlowSink(graph, 2.0f)
@@ -181,7 +181,7 @@ class FlowEngineTest {
@Test
fun testOutletEquality() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sourceA = SimpleFlowSource(graph, 2000.0f, 0.8f)
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt
index a2ed2195..fef49786 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/ForwardingFlowMultiplexerTest.kt
@@ -39,7 +39,7 @@ class ForwardingFlowMultiplexerTest {
*/
@Test
fun testTrace() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val switch = ForwardingFlowMultiplexer(graph)
@@ -60,7 +60,7 @@ class ForwardingFlowMultiplexerTest {
advanceUntilIdle()
assertAll(
- { assertEquals(4000, clock.millis()) { "Took enough time" } }
+ { assertEquals(4000, timeSource.millis()) { "Took enough time" } }
)
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt
index ba339ee3..ebae2d4e 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/mux/MaxMinFlowMultiplexerTest.kt
@@ -35,7 +35,7 @@ import org.opendc.simulator.kotlin.runSimulation
class MaxMinFlowMultiplexerTest {
@Test
fun testSmoke() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val switch = MaxMinFlowMultiplexer(graph)
@@ -49,6 +49,6 @@ class MaxMinFlowMultiplexerTest {
advanceUntilIdle()
- assertEquals(500, clock.millis())
+ assertEquals(500, timeSource.millis())
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt
index a75efba3..ea516c63 100644
--- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow2/sink/FlowSinkTest.kt
@@ -37,7 +37,7 @@ import java.util.concurrent.ThreadLocalRandom
class FlowSinkTest {
@Test
fun testSmoke() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 1.0f)
@@ -46,12 +46,12 @@ class FlowSinkTest {
graph.connect(source.output, sink.input)
advanceUntilIdle()
- assertEquals(2000, clock.millis())
+ assertEquals(2000, timeSource.millis())
}
@Test
fun testAdjustCapacity() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 1.0f)
@@ -64,12 +64,12 @@ class FlowSinkTest {
advanceUntilIdle()
- assertEquals(3000, clock.millis())
+ assertEquals(3000, timeSource.millis())
}
@Test
fun testUtilization() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 1.0f)
@@ -78,12 +78,12 @@ class FlowSinkTest {
graph.connect(source.output, sink.input)
advanceUntilIdle()
- assertEquals(4000, clock.millis())
+ assertEquals(4000, timeSource.millis())
}
@Test
fun testFragments() = runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 1.0f)
@@ -100,7 +100,7 @@ class FlowSinkTest {
graph.connect(source.output, sink.input)
advanceUntilIdle()
- assertEquals(4000, clock.millis())
+ assertEquals(4000, timeSource.millis())
}
@Test
@@ -114,7 +114,7 @@ class FlowSinkTest {
)
return runSimulation {
- val engine = FlowEngine.create(coroutineContext, clock)
+ val engine = FlowEngine.create(dispatcher)
val graph = engine.newGraph()
val sink = SimpleFlowSink(graph, 4200.0f)
val source = TraceFlowSource(graph, trace)