summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt133
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt72
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt131
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt62
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt60
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt36
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt53
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt95
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt256
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt75
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt155
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt70
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt19
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt28
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt28
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt44
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt452
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt116
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt215
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt195
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt56
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt100
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt154
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt789
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt57
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt52
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt77
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt67
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt104
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt321
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt241
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt154
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt149
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt57
34 files changed, 4673 insertions, 0 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
new file mode 100644
index 00000000..aabd2220
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
@@ -0,0 +1,133 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.launch
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer
+import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer
+import org.opendc.simulator.flow.source.TraceFlowSource
+import org.openjdk.jmh.annotations.*
+import java.util.concurrent.ThreadLocalRandom
+import java.util.concurrent.TimeUnit
+
+@State(Scope.Thread)
+@Fork(1)
+@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
+@OptIn(ExperimentalCoroutinesApi::class)
+class FlowBenchmarks {
+ private lateinit var trace: Sequence<TraceFlowSource.Fragment>
+
+ @Setup
+ fun setUp() {
+ val random = ThreadLocalRandom.current()
+ val entries = List(10000) { TraceFlowSource.Fragment(1000, random.nextDouble(0.0, 4500.0)) }
+ trace = entries.asSequence()
+ }
+
+ @Benchmark
+ fun benchmarkSink() {
+ return runBlockingSimulation {
+ val engine = FlowEngine(coroutineContext, clock)
+ val provider = FlowSink(engine, 4200.0)
+ return@runBlockingSimulation provider.consume(TraceFlowSource(trace))
+ }
+ }
+
+ @Benchmark
+ fun benchmarkForward() {
+ return runBlockingSimulation {
+ val engine = FlowEngine(coroutineContext, clock)
+ val provider = FlowSink(engine, 4200.0)
+ val forwarder = FlowForwarder(engine)
+ provider.startConsumer(forwarder)
+ return@runBlockingSimulation forwarder.consume(TraceFlowSource(trace))
+ }
+ }
+
+ @Benchmark
+ fun benchmarkMuxMaxMinSingleSource() {
+ return runBlockingSimulation {
+ val engine = FlowEngine(coroutineContext, clock)
+ val switch = MaxMinFlowMultiplexer(engine)
+
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
+
+ val provider = switch.newInput()
+ return@runBlockingSimulation provider.consume(TraceFlowSource(trace))
+ }
+ }
+
+ @Benchmark
+ fun benchmarkMuxMaxMinTripleSource() {
+ return runBlockingSimulation {
+ val engine = FlowEngine(coroutineContext, clock)
+ val switch = MaxMinFlowMultiplexer(engine)
+
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
+
+ repeat(3) {
+ launch {
+ val provider = switch.newInput()
+ provider.consume(TraceFlowSource(trace))
+ }
+ }
+ }
+ }
+
+ @Benchmark
+ fun benchmarkMuxExclusiveSingleSource() {
+ return runBlockingSimulation {
+ val engine = FlowEngine(coroutineContext, clock)
+ val switch = ForwardingFlowMultiplexer(engine)
+
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
+
+ val provider = switch.newInput()
+ return@runBlockingSimulation provider.consume(TraceFlowSource(trace))
+ }
+ }
+
+ @Benchmark
+ fun benchmarkMuxExclusiveTripleSource() {
+ return runBlockingSimulation {
+ val engine = FlowEngine(coroutineContext, clock)
+ val switch = ForwardingFlowMultiplexer(engine)
+
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
+ FlowSink(engine, 3000.0).startConsumer(switch.newOutput())
+
+ repeat(2) {
+ launch {
+ val provider = switch.newInput()
+ provider.consume(TraceFlowSource(trace))
+ }
+ }
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt
new file mode 100644
index 00000000..8ff0bc76
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+/**
+ * An active connection between a [FlowSource] and [FlowConsumer].
+ */
+public interface FlowConnection : AutoCloseable {
+ /**
+ * The capacity of the connection.
+ */
+ public val capacity: Double
+
+ /**
+ * The flow rate over the connection.
+ */
+ public val rate: Double
+
+ /**
+ * The flow demand of the source.
+ */
+ public val demand: Double
+
+ /**
+ * A flag to control whether [FlowSource.onConverge] should be invoked for this source.
+ */
+ public var shouldSourceConverge: Boolean
+
+ /**
+ * Pull the source.
+ */
+ public fun pull()
+
+ /**
+ * Pull the source.
+ *
+ * @param now The timestamp at which the connection is pulled.
+ */
+ public fun pull(now: Long)
+
+ /**
+ * Push the given flow [rate] over this connection.
+ *
+ * @param rate The rate of the flow to push.
+ */
+ public fun push(rate: Double)
+
+ /**
+ * Disconnect the consumer from its source.
+ */
+ public override fun close()
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
new file mode 100644
index 00000000..4685a755
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
@@ -0,0 +1,131 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+import kotlinx.coroutines.suspendCancellableCoroutine
+import kotlin.coroutines.resume
+import kotlin.coroutines.resumeWithException
+
+/**
+ * A consumer of a [FlowSource].
+ */
+public interface FlowConsumer {
+ /**
+ * A flag to indicate that the consumer is currently consuming a [FlowSource].
+ */
+ public val isActive: Boolean
+
+ /**
+ * The flow capacity of this consumer.
+ */
+ public val capacity: Double
+
+ /**
+ * The current flow rate of the consumer.
+ */
+ public val rate: Double
+
+ /**
+ * The current flow demand.
+ */
+ public val demand: Double
+
+ /**
+ * The flow counters to track the flow metrics of the consumer.
+ */
+ public val counters: FlowCounters
+
+ /**
+ * Start consuming the specified [source].
+ *
+ * @throws IllegalStateException if the consumer is already active.
+ */
+ public fun startConsumer(source: FlowSource)
+
+ /**
+ * Ask the consumer to pull its source.
+ *
+ * If the consumer is not active, this operation will be a no-op.
+ */
+ public fun pull()
+
+ /**
+ * Disconnect the consumer from its source.
+ *
+ * If the consumer is not active, this operation will be a no-op.
+ */
+ public fun cancel()
+}
+
+/**
+ * Consume the specified [source] and suspend execution until the source is fully consumed or failed.
+ */
+public suspend fun FlowConsumer.consume(source: FlowSource) {
+ return suspendCancellableCoroutine { cont ->
+ startConsumer(object : FlowSource {
+ override fun onStart(conn: FlowConnection, now: Long) {
+ try {
+ source.onStart(conn, now)
+ } catch (cause: Throwable) {
+ cont.resumeWithException(cause)
+ throw cause
+ }
+ }
+
+ override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ try {
+ source.onStop(conn, now, delta)
+
+ if (!cont.isCompleted) {
+ cont.resume(Unit)
+ }
+ } catch (cause: Throwable) {
+ cont.resumeWithException(cause)
+ throw cause
+ }
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return try {
+ source.onPull(conn, now, delta)
+ } catch (cause: Throwable) {
+ cont.resumeWithException(cause)
+ throw cause
+ }
+ }
+
+ override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ try {
+ source.onConverge(conn, now, delta)
+ } catch (cause: Throwable) {
+ cont.resumeWithException(cause)
+ throw cause
+ }
+ }
+
+ override fun toString(): String = "SuspendingFlowSource"
+ })
+
+ cont.invokeOnCancellation { cancel() }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
new file mode 100644
index 00000000..98922ab3
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+/**
+ * A controllable [FlowConnection].
+ *
+ * This interface is used by [FlowConsumer]s to control the connection between it and the source.
+ */
+public interface FlowConsumerContext : FlowConnection {
+ /**
+ * The deadline of the source.
+ */
+ public val deadline: Long
+
+ /**
+ * The capacity of the connection.
+ */
+ public override var capacity: Double
+
+ /**
+ * A flag to control whether [FlowConsumerLogic.onConverge] should be invoked for the consumer.
+ */
+ public var shouldConsumerConverge: Boolean
+
+ /**
+ * A flag to control whether the timers for the [FlowSource] should be enabled.
+ */
+ public var enableTimers: Boolean
+
+ /**
+ * Start the flow over the connection.
+ */
+ public fun start()
+
+ /**
+ * Synchronously pull the source of the connection.
+ *
+ * @param now The timestamp at which the connection is pulled.
+ */
+ public fun pullSync(now: Long)
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
new file mode 100644
index 00000000..50fbc9c7
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+/**
+ * A collection of callbacks associated with a [FlowConsumer].
+ */
+public interface FlowConsumerLogic {
+ /**
+ * This method is invoked when a [FlowSource] changes the rate of flow to this consumer.
+ *
+ * @param ctx The context in which the provider runs.
+ * @param now The virtual timestamp in milliseconds at which the update is occurring.
+ * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds.
+ * @param rate The requested processing rate of the source.
+ */
+ public fun onPush(ctx: FlowConsumerContext, now: Long, delta: Long, rate: Double) {}
+
+ /**
+ * This method is invoked when the flow graph has converged into a steady-state system.
+ *
+ * Make sure to enable [FlowConsumerContext.shouldSourceConverge] if you need this callback. By default, this method
+ * will not be invoked.
+ *
+ * @param ctx The context in which the provider runs.
+ * @param now The virtual timestamp in milliseconds at which the system converged.
+ * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds.
+ */
+ public fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {}
+
+ /**
+ * This method is invoked when the [FlowSource] completed or failed.
+ *
+ * @param ctx The context in which the provider runs.
+ * @param now The virtual timestamp in milliseconds at which the provider finished.
+ * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds.
+ * @param cause The cause of the failure or `null` if the source completed.
+ */
+ public fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {}
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt
new file mode 100644
index 00000000..d1afda6f
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+/**
+ * A listener interface for when a flow stage has converged into a steady-state.
+ */
+public interface FlowConvergenceListener {
+ /**
+ * This method is invoked when the system has converged to a steady-state.
+ *
+ * @param now The timestamp at which the system converged.
+ * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds.
+ */
+ public fun onConverge(now: Long, delta: Long) {}
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
new file mode 100644
index 00000000..a717ae6e
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+/**
+ * An interface that tracks cumulative counts of the flow accumulation over a stage.
+ */
+public interface FlowCounters {
+ /**
+ * The accumulated flow that a source wanted to push over the connection.
+ */
+ public val demand: Double
+
+ /**
+ * The accumulated flow that was actually transferred over the connection.
+ */
+ public val actual: Double
+
+ /**
+ * The amount of capacity that was not utilized.
+ */
+ public val remaining: Double
+
+ /**
+ * The accumulated flow lost due to interference between sources.
+ */
+ public val interference: Double
+
+ /**
+ * Reset the flow counters.
+ */
+ public fun reset()
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt
new file mode 100644
index 00000000..65224827
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * A [FlowEngine] is responsible for managing the interaction between [FlowSource]s and [FlowConsumer]s.
+ *
+ * The engine centralizes the scheduling logic of state updates of flow connections, allowing update propagation
+ * to happen more efficiently. and overall, reducing the work necessary to transition into a steady state.
+ */
+public interface FlowEngine {
+ /**
+ * The virtual [Clock] associated with this engine.
+ */
+ public val clock: Clock
+
+ /**
+ * Create a new [FlowConsumerContext] with the given [provider].
+ *
+ * @param consumer The consumer logic.
+ * @param provider The logic of the resource provider.
+ */
+ public fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext
+
+ /**
+ * Start batching the execution of resource updates until [popBatch] is called.
+ *
+ * This method is useful if you want to propagate multiple resources updates (e.g., starting multiple CPUs
+ * simultaneously) in a single state update.
+ *
+ * Multiple calls to this method requires the same number of [popBatch] calls in order to properly flush the
+ * resource updates. This allows nested calls to [pushBatch], but might cause issues if [popBatch] is not called
+ * the same amount of times. To simplify batching, see [batch].
+ */
+ public fun pushBatch()
+
+ /**
+ * Stop the batching of resource updates and run the interpreter on the batch.
+ *
+ * Note that method will only flush the event once the first call to [pushBatch] has received a [popBatch] call.
+ */
+ public fun popBatch()
+
+ public companion object {
+ /**
+ * Construct a new [FlowEngine] implementation.
+ *
+ * @param context The coroutine context to use.
+ * @param clock The virtual simulation clock.
+ */
+ @JvmStatic
+ @JvmName("create")
+ public operator fun invoke(context: CoroutineContext, clock: Clock): FlowEngine {
+ return FlowEngineImpl(context, clock)
+ }
+ }
+}
+
+/**
+ * Batch the execution of several interrupts into a single call.
+ *
+ * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update.
+ */
+public inline fun FlowEngine.batch(block: () -> Unit) {
+ try {
+ pushBatch()
+ block()
+ } finally {
+ popBatch()
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
new file mode 100644
index 00000000..e3bdd7ba
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
@@ -0,0 +1,256 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+import mu.KotlinLogging
+import org.opendc.simulator.flow.internal.D_MS_TO_S
+import org.opendc.simulator.flow.internal.MutableFlowCounters
+import kotlin.math.max
+
+/**
+ * A class that acts as a [FlowSource] and [FlowConsumer] at the same time.
+ *
+ * @param engine The [FlowEngine] the forwarder runs in.
+ * @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits.
+ */
+public class FlowForwarder(private val engine: FlowEngine, private val isCoupled: Boolean = false) : FlowSource, FlowConsumer, AutoCloseable {
+ /**
+ * The logging instance of this connection.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The delegate [FlowSource].
+ */
+ private var delegate: FlowSource? = null
+
+ /**
+ * A flag to indicate that the delegate was started.
+ */
+ private var hasDelegateStarted: Boolean = false
+
+ /**
+ * The exposed [FlowConnection].
+ */
+ private val _ctx = object : FlowConnection {
+ override var shouldSourceConverge: Boolean = false
+ set(value) {
+ field = value
+ _innerCtx?.shouldSourceConverge = value
+ }
+
+ override val capacity: Double
+ get() = _innerCtx?.capacity ?: 0.0
+
+ override val demand: Double
+ get() = _innerCtx?.demand ?: 0.0
+
+ override val rate: Double
+ get() = _innerCtx?.rate ?: 0.0
+
+ override fun pull() {
+ _innerCtx?.pull()
+ }
+
+ override fun pull(now: Long) {
+ _innerCtx?.pull(now)
+ }
+
+ @JvmField var lastPull = Long.MAX_VALUE
+
+ override fun push(rate: Double) {
+ if (delegate == null) {
+ return
+ }
+
+ _innerCtx?.push(rate)
+ _demand = rate
+ }
+
+ override fun close() {
+ val delegate = delegate ?: return
+ val hasDelegateStarted = hasDelegateStarted
+
+ // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
+ // reset beforehand the existing state and check whether it has been updated afterwards
+ reset()
+
+ if (hasDelegateStarted) {
+ val now = engine.clock.millis()
+ val delta = max(0, now - lastPull)
+ delegate.onStop(this, now, delta)
+ }
+ }
+ }
+
+ /**
+ * The [FlowConnection] in which the forwarder runs.
+ */
+ private var _innerCtx: FlowConnection? = null
+
+ override val isActive: Boolean
+ get() = delegate != null
+
+ override val capacity: Double
+ get() = _ctx.capacity
+
+ override val rate: Double
+ get() = _ctx.rate
+
+ override val demand: Double
+ get() = _ctx.demand
+
+ override val counters: FlowCounters
+ get() = _counters
+ private val _counters = MutableFlowCounters()
+
+ override fun startConsumer(source: FlowSource) {
+ check(delegate == null) { "Forwarder already active" }
+
+ delegate = source
+
+ // Pull to replace the source
+ pull()
+ }
+
+ override fun pull() {
+ _ctx.pull()
+ }
+
+ override fun cancel() {
+ _ctx.close()
+ }
+
+ override fun close() {
+ val ctx = _innerCtx
+
+ if (ctx != null) {
+ this._innerCtx = null
+ ctx.pull()
+ }
+ }
+
+ override fun onStart(conn: FlowConnection, now: Long) {
+ _innerCtx = conn
+
+ if (_ctx.shouldSourceConverge) {
+ conn.shouldSourceConverge = true
+ }
+ }
+
+ override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ _innerCtx = null
+
+ val delegate = delegate
+ if (delegate != null) {
+ reset()
+
+ try {
+ delegate.onStop(this._ctx, now, delta)
+ } catch (cause: Throwable) {
+ logger.error(cause) { "Uncaught exception" }
+ }
+ }
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ val delegate = delegate
+
+ if (!hasDelegateStarted) {
+ start()
+ }
+
+ _ctx.lastPull = now
+ updateCounters(conn, delta)
+
+ return try {
+ delegate?.onPull(_ctx, now, delta) ?: Long.MAX_VALUE
+ } catch (cause: Throwable) {
+ logger.error(cause) { "Uncaught exception" }
+
+ reset()
+ Long.MAX_VALUE
+ }
+ }
+
+ override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ try {
+ delegate?.onConverge(this._ctx, now, delta)
+ } catch (cause: Throwable) {
+ logger.error(cause) { "Uncaught exception" }
+
+ _innerCtx = null
+ reset()
+ }
+ }
+
+ /**
+ * Start the delegate.
+ */
+ private fun start() {
+ val delegate = delegate ?: return
+
+ try {
+ delegate.onStart(_ctx, engine.clock.millis())
+ hasDelegateStarted = true
+ } catch (cause: Throwable) {
+ logger.error(cause) { "Uncaught exception" }
+ reset()
+ }
+ }
+
+ /**
+ * Reset the delegate.
+ */
+ private fun reset() {
+ if (isCoupled)
+ _innerCtx?.close()
+ else
+ _innerCtx?.push(0.0)
+
+ delegate = null
+ hasDelegateStarted = false
+ }
+
+ /**
+ * The requested flow rate.
+ */
+ private var _demand: Double = 0.0
+
+ /**
+ * Update the flow counters for the transformer.
+ */
+ private fun updateCounters(ctx: FlowConnection, delta: Long) {
+ if (delta <= 0) {
+ return
+ }
+
+ val counters = _counters
+ val deltaS = delta * D_MS_TO_S
+ val total = ctx.capacity * deltaS
+ val work = _demand * deltaS
+ val actualWork = ctx.rate * deltaS
+
+ counters.increment(work, actualWork, (total - actualWork), 0.0)
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt
new file mode 100644
index 00000000..6867bcef
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+/**
+ * A [FlowConsumer] that maps the pushed flow through [transform].
+ *
+ * @param source The source of the flow.
+ * @param transform The method to transform the flow.
+ */
+public class FlowMapper(
+ private val source: FlowSource,
+ private val transform: (FlowConnection, Double) -> Double
+) : FlowSource {
+
+ /**
+ * The current active connection.
+ */
+ private var _conn: Connection? = null
+
+ override fun onStart(conn: FlowConnection, now: Long) {
+ check(_conn == null) { "Concurrent access" }
+ val delegate = Connection(conn, transform)
+ _conn = delegate
+ source.onStart(delegate, now)
+ }
+
+ override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ val delegate = checkNotNull(_conn) { "Invariant violation" }
+ _conn = null
+ source.onStop(delegate, now, delta)
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ val delegate = checkNotNull(_conn) { "Invariant violation" }
+ return source.onPull(delegate, now, delta)
+ }
+
+ override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ val delegate = _conn ?: return
+ source.onConverge(delegate, now, delta)
+ }
+
+ /**
+ * The wrapper [FlowConnection] that is used to transform the flow.
+ */
+ private class Connection(
+ private val delegate: FlowConnection,
+ private val transform: (FlowConnection, Double) -> Double
+ ) : FlowConnection by delegate {
+ override fun push(rate: Double) {
+ delegate.push(transform(this, rate))
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
new file mode 100644
index 00000000..e9094443
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
@@ -0,0 +1,155 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+import org.opendc.simulator.flow.internal.D_MS_TO_S
+import org.opendc.simulator.flow.internal.MutableFlowCounters
+
+/**
+ * A [FlowSink] represents a sink with a fixed capacity.
+ *
+ * @param initialCapacity The initial capacity of the resource.
+ * @param engine The engine that is used for driving the flow simulation.
+ * @param parent The parent flow system.
+ */
+public class FlowSink(
+ private val engine: FlowEngine,
+ initialCapacity: Double,
+ private val parent: FlowConvergenceListener? = null
+) : FlowConsumer {
+ /**
+ * A flag to indicate that the flow consumer is active.
+ */
+ public override val isActive: Boolean
+ get() = _ctx != null
+
+ /**
+ * The capacity of the consumer.
+ */
+ public override var capacity: Double = initialCapacity
+ set(value) {
+ field = value
+ _ctx?.capacity = value
+ }
+
+ /**
+ * The current processing rate of the consumer.
+ */
+ public override val rate: Double
+ get() = _ctx?.rate ?: 0.0
+
+ /**
+ * The flow processing rate demand at this instant.
+ */
+ public override val demand: Double
+ get() = _ctx?.demand ?: 0.0
+
+ /**
+ * The flow counters to track the flow metrics of the consumer.
+ */
+ public override val counters: FlowCounters
+ get() = _counters
+ private val _counters = MutableFlowCounters()
+
+ /**
+ * The current active [FlowConsumerLogic] of this sink.
+ */
+ private var _ctx: FlowConsumerContext? = null
+
+ override fun startConsumer(source: FlowSource) {
+ check(_ctx == null) { "Consumer is in invalid state" }
+
+ val ctx = engine.newContext(source, Logic(parent, _counters))
+ _ctx = ctx
+
+ ctx.capacity = capacity
+ if (parent != null) {
+ ctx.shouldConsumerConverge = true
+ }
+
+ ctx.start()
+ }
+
+ override fun pull() {
+ _ctx?.pull()
+ }
+
+ override fun cancel() {
+ _ctx?.close()
+ }
+
+ override fun toString(): String = "FlowSink[capacity=$capacity]"
+
+ /**
+ * [FlowConsumerLogic] of a sink.
+ */
+ private inner class Logic(private val parent: FlowConvergenceListener?, private val counters: MutableFlowCounters) : FlowConsumerLogic {
+ override fun onPush(
+ ctx: FlowConsumerContext,
+ now: Long,
+ delta: Long,
+ rate: Double
+ ) {
+ updateCounters(ctx, delta, rate, ctx.capacity)
+ }
+
+ override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {
+ updateCounters(ctx, delta, 0.0, 0.0)
+
+ _ctx = null
+ }
+
+ override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
+ parent?.onConverge(now, delta)
+ }
+
+ /**
+ * The previous demand and capacity for the consumer.
+ */
+ private val _previous = DoubleArray(2)
+
+ /**
+ * Update the counters of the flow consumer.
+ */
+ private fun updateCounters(ctx: FlowConnection, delta: Long, nextDemand: Double, nextCapacity: Double) {
+ val counters = counters
+ val previous = _previous
+ val demand = previous[0]
+ val capacity = previous[1]
+
+ previous[0] = nextDemand
+ previous[1] = nextCapacity
+
+ if (delta <= 0) {
+ return
+ }
+
+ val deltaS = delta * D_MS_TO_S
+ val total = demand * deltaS
+ val work = capacity * deltaS
+ val actualWork = ctx.rate * deltaS
+
+ counters.increment(work, actualWork, (total - actualWork), 0.0)
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
new file mode 100644
index 00000000..3a7e52aa
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+/**
+ * A source of flow that is consumed by a [FlowConsumer].
+ *
+ * Implementations of this interface should be considered stateful and must be assumed not to be re-usable
+ * (concurrently) for multiple [FlowConsumer]s, unless explicitly said otherwise.
+ */
+public interface FlowSource {
+ /**
+ * This method is invoked when the source is started.
+ *
+ * @param conn The connection between the source and consumer.
+ * @param now The virtual timestamp in milliseconds at which the provider finished.
+ */
+ public fun onStart(conn: FlowConnection, now: Long) {}
+
+ /**
+ * This method is invoked when the source is finished.
+ *
+ * @param conn The connection between the source and consumer.
+ * @param now The virtual timestamp in milliseconds at which the source finished.
+ * @param delta The virtual duration between this call and the last call to [onPull] in milliseconds.
+ */
+ public fun onStop(conn: FlowConnection, now: Long, delta: Long) {}
+
+ /**
+ * This method is invoked when the source is pulled.
+ *
+ * @param conn The connection between the source and consumer.
+ * @param now The virtual timestamp in milliseconds at which the pull is occurring.
+ * @param delta The virtual duration between this call and the last call to [onPull] in milliseconds.
+ * @return The duration after which the resource consumer should be pulled again.
+ */
+ public fun onPull(conn: FlowConnection, now: Long, delta: Long): Long
+
+ /**
+ * This method is invoked when the flow graph has converged into a steady-state system.
+ *
+ * Make sure to enable [FlowConnection.shouldSourceConverge] if you need this callback. By default, this method
+ * will not be invoked.
+ *
+ * @param conn The connection between the source and consumer.
+ * @param now The virtual timestamp in milliseconds at which the system converged.
+ * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds.
+ */
+ public fun onConverge(conn: FlowConnection, now: Long, delta: Long) {}
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt
new file mode 100644
index 00000000..aa2713b6
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt
@@ -0,0 +1,19 @@
+package org.opendc.simulator.flow.interference
+
+import org.opendc.simulator.flow.FlowSource
+
+/**
+ * An interference domain represents a system of flow stages where [flow sources][FlowSource] may incur
+ * performance variability due to operating on the same resource and therefore causing interference.
+ */
+public interface InterferenceDomain {
+ /**
+ * Compute the performance score of a participant in this interference domain.
+ *
+ * @param key The participant to obtain the score of or `null` if the participant has no key.
+ * @param load The overall load on the interference domain.
+ * @return A score representing the performance score to be applied to the resource consumer, with 1
+ * meaning no influence, <1 means that performance degrades, and >1 means that performance improves.
+ */
+ public fun apply(key: InterferenceKey?, load: Double): Double
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt
new file mode 100644
index 00000000..d28ebde5
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.interference
+
+/**
+ * A key that uniquely identifies a participant of an interference domain.
+ */
+public interface InterferenceKey
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt
new file mode 100644
index 00000000..450195ec
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.internal
+
+/**
+ * Constant for converting milliseconds into seconds.
+ */
+internal const val D_MS_TO_S = 1 / 1000.0
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt
new file mode 100644
index 00000000..97d56fff
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.internal
+
+/**
+ * States of the flow connection.
+ */
+internal const val ConnPending = 0 // Connection is pending and the consumer is waiting to consume the source
+internal const val ConnActive = 1 // Connection is active and the source is currently being consumed
+internal const val ConnClosed = 2 // Connection is closed and source cannot be consumed through this connection anymore
+internal const val ConnState = 0b11 // Mask for accessing the state of the flow connection
+
+/**
+ * Flags of the flow connection
+ */
+internal const val ConnPulled = 1 shl 2 // The source should be pulled
+internal const val ConnPushed = 1 shl 3 // The source has pushed a value
+internal const val ConnClose = 1 shl 4 // The connection should be closed
+internal const val ConnUpdateActive = 1 shl 5 // An update for the connection is active
+internal const val ConnUpdatePending = 1 shl 6 // An (immediate) update of the connection is pending
+internal const val ConnConvergePending = 1 shl 7 // Indication that a convergence is already pending
+internal const val ConnConvergeSource = 1 shl 8 // Enable convergence of the source
+internal const val ConnConvergeConsumer = 1 shl 9 // Enable convergence of the consumer
+internal const val ConnDisableTimers = 1 shl 10 // Disable timers for the source
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
new file mode 100644
index 00000000..58ca918b
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
@@ -0,0 +1,452 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.internal
+
+import mu.KotlinLogging
+import org.opendc.simulator.flow.*
+import java.util.*
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * Implementation of a [FlowConnection] managing the communication between flow sources and consumers.
+ */
+internal class FlowConsumerContextImpl(
+ private val engine: FlowEngineImpl,
+ private val source: FlowSource,
+ private val logic: FlowConsumerLogic
+) : FlowConsumerContext {
+ /**
+ * The logging instance of this connection.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The capacity of the connection.
+ */
+ override var capacity: Double
+ get() = _capacity
+ set(value) {
+ val oldValue = _capacity
+
+ // Only changes will be propagated
+ if (value != oldValue) {
+ _capacity = value
+ pull()
+ }
+ }
+ private var _capacity: Double = 0.0
+
+ /**
+ * The current processing rate of the connection.
+ */
+ override val rate: Double
+ get() = _rate
+ private var _rate = 0.0
+
+ /**
+ * The current flow processing demand.
+ */
+ override val demand: Double
+ get() = _demand
+ private var _demand: Double = 0.0 // The current (pending) demand of the source
+
+ /**
+ * The deadline of the source.
+ */
+ override val deadline: Long
+ get() = _deadline
+ private var _deadline: Long = Long.MAX_VALUE // The deadline of the source's timer
+
+ /**
+ * Flags to control the convergence of the consumer and source.
+ */
+ override var shouldSourceConverge: Boolean
+ get() = _flags and ConnConvergeSource == ConnConvergeSource
+ set(value) {
+ _flags =
+ if (value)
+ _flags or ConnConvergeSource
+ else
+ _flags and ConnConvergeSource.inv()
+ }
+ override var shouldConsumerConverge: Boolean
+ get() = _flags and ConnConvergeConsumer == ConnConvergeConsumer
+ set(value) {
+ _flags =
+ if (value)
+ _flags or ConnConvergeConsumer
+ else
+ _flags and ConnConvergeConsumer.inv()
+ }
+
+ /**
+ * Flag to control the timers on the [FlowSource]
+ */
+ override var enableTimers: Boolean
+ get() = _flags and ConnDisableTimers != ConnDisableTimers
+ set(value) {
+ _flags =
+ if (!value)
+ _flags or ConnDisableTimers
+ else
+ _flags and ConnDisableTimers.inv()
+ }
+
+ /**
+ * The clock to track simulation time.
+ */
+ private val _clock = engine.clock
+
+ /**
+ * The flags of the flow connection, indicating certain actions.
+ */
+ private var _flags: Int = 0
+
+ /**
+ * The timestamp of calls to the callbacks.
+ */
+ private var _lastPull: Long = Long.MIN_VALUE // Last call to `onPull`
+ private var _lastPush: Long = Long.MIN_VALUE // Last call to `onPush`
+ private var _lastSourceConvergence: Long = Long.MAX_VALUE // Last call to source `onConvergence`
+ private var _lastConsumerConvergence: Long = Long.MAX_VALUE // Last call to consumer `onConvergence`
+
+ /**
+ * The timers at which the context is scheduled to be interrupted.
+ */
+ private var _timer: Long = Long.MAX_VALUE
+ private val _pendingTimers: ArrayDeque<Long> = ArrayDeque(5)
+
+ override fun start() {
+ check(_flags and ConnState == ConnPending) { "Consumer is already started" }
+ engine.batch {
+ val now = _clock.millis()
+ source.onStart(this, now)
+
+ // Mark the connection as active and pulled
+ val newFlags = (_flags and ConnState.inv()) or ConnActive or ConnPulled
+ scheduleImmediate(now, newFlags)
+ }
+ }
+
+ override fun close() {
+ val flags = _flags
+ if (flags and ConnState == ConnClosed) {
+ return
+ }
+
+ // Toggle the close bit. In case no update is active, schedule a new update.
+ if (flags and ConnUpdateActive == 0) {
+ val now = _clock.millis()
+ scheduleImmediate(now, flags or ConnClose)
+ } else {
+ _flags = flags or ConnClose
+ }
+ }
+
+ override fun pull(now: Long) {
+ val flags = _flags
+ if (flags and ConnState != ConnActive) {
+ return
+ }
+
+ // Mark connection as pulled
+ scheduleImmediate(now, flags or ConnPulled)
+ }
+
+ override fun pull() {
+ pull(_clock.millis())
+ }
+
+ override fun pullSync(now: Long) {
+ val flags = _flags
+
+ // Do not attempt to flush the connection if the connection is closed or an update is already active
+ if (flags and ConnState != ConnActive || flags and ConnUpdateActive != 0) {
+ return
+ }
+
+ if (flags and (ConnPulled or ConnPushed) != 0 || _deadline == now) {
+ engine.scheduleSync(now, this)
+ }
+ }
+
+ override fun push(rate: Double) {
+ if (_demand == rate) {
+ return
+ }
+
+ _demand = rate
+
+ val flags = _flags
+
+ if (flags and ConnUpdateActive != 0) {
+ // If an update is active, it will already get picked up at the end of the update
+ _flags = flags or ConnPushed
+ } else {
+ // Invalidate only if no update is active
+ scheduleImmediate(_clock.millis(), flags or ConnPushed)
+ }
+ }
+
+ /**
+ * Update the state of the flow connection.
+ *
+ * @param now The current virtual timestamp.
+ * @param visited The queue of connections that have been visited during the cycle.
+ * @param timerQueue The queue of all pending timers.
+ * @param isImmediate A flag to indicate that this invocation is an immediate update or a delayed update.
+ */
+ fun doUpdate(
+ now: Long,
+ visited: FlowDeque,
+ timerQueue: FlowTimerQueue,
+ isImmediate: Boolean
+ ) {
+ var flags = _flags
+
+ // Precondition: The flow connection must be active
+ if (flags and ConnState != ConnActive) {
+ return
+ }
+
+ val deadline = _deadline
+ val reachedDeadline = deadline == now
+ var newDeadline: Long
+ var hasUpdated = false
+
+ try {
+ // Pull the source if (1) `pull` is called or (2) the timer of the source has expired
+ newDeadline = if (flags and ConnPulled != 0 || reachedDeadline) {
+ val lastPull = _lastPull
+ val delta = max(0, now - lastPull)
+
+ // Update state before calling into the outside world, so it observes a consistent state
+ _lastPull = now
+ _flags = (flags and ConnPulled.inv()) or ConnUpdateActive
+ hasUpdated = true
+
+ val duration = source.onPull(this, now, delta)
+
+ // IMPORTANT: Re-fetch the flags after the callback might have changed those
+ flags = _flags
+
+ if (duration != Long.MAX_VALUE)
+ now + duration
+ else
+ duration
+ } else {
+ deadline
+ }
+
+ // Make the new deadline available for the consumer if it has changed
+ if (newDeadline != deadline) {
+ _deadline = newDeadline
+ }
+
+ // Push to the consumer if the rate of the source has changed (after a call to `push`)
+ if (flags and ConnPushed != 0) {
+ val lastPush = _lastPush
+ val delta = max(0, now - lastPush)
+
+ // Update state before calling into the outside world, so it observes a consistent state
+ _lastPush = now
+ _flags = (flags and ConnPushed.inv()) or ConnUpdateActive
+ hasUpdated = true
+
+ logic.onPush(this, now, delta, _demand)
+
+ // IMPORTANT: Re-fetch the flags after the callback might have changed those
+ flags = _flags
+ }
+
+ // Check whether the source or consumer have tried to close the connection
+ if (flags and ConnClose != 0) {
+ hasUpdated = true
+
+ // The source has called [FlowConnection.close], so clean up the connection
+ doStopSource(now)
+
+ // IMPORTANT: Re-fetch the flags after the callback might have changed those
+ // We now also mark the connection as closed
+ flags = (_flags and ConnState.inv()) or ConnClosed
+
+ _demand = 0.0
+ newDeadline = Long.MAX_VALUE
+ }
+ } catch (cause: Throwable) {
+ hasUpdated = true
+
+ // Clean up the connection
+ doFailSource(now, cause)
+
+ // Mark the connection as closed
+ flags = (flags and ConnState.inv()) or ConnClosed
+
+ _demand = 0.0
+ newDeadline = Long.MAX_VALUE
+ }
+
+ // Check whether the connection needs to be added to the visited queue. This is the case when:
+ // (1) An update was performed (either a push or a pull)
+ // (2) Either the source or consumer want to converge, and
+ // (3) Convergence is not already pending (ConnConvergePending)
+ if (hasUpdated && flags and (ConnConvergeSource or ConnConvergeConsumer) != 0 && flags and ConnConvergePending == 0) {
+ visited.add(this)
+ flags = flags or ConnConvergePending
+ }
+
+ // Compute the new flow rate of the connection
+ // Note: _demand might be changed by [logic.onConsume], so we must re-fetch the value
+ _rate = min(_capacity, _demand)
+
+ // Indicate that no update is active anymore and flush the flags
+ _flags = flags and ConnUpdateActive.inv() and ConnUpdatePending.inv()
+
+ val pendingTimers = _pendingTimers
+
+ // Prune the head timer if this is a delayed update
+ val timer = if (!isImmediate) {
+ // Invariant: Any pending timer should only point to a future timestamp
+ val timer = pendingTimers.poll() ?: Long.MAX_VALUE
+ _timer = timer
+ timer
+ } else {
+ _timer
+ }
+
+ // Check whether we need to schedule a new timer for this connection. That is the case when:
+ // (1) The deadline is valid (not the maximum value)
+ // (2) The connection is active
+ // (3) Timers are not disabled for the source
+ // (4) The current active timer for the connection points to a later deadline
+ if (newDeadline == Long.MAX_VALUE ||
+ flags and ConnState != ConnActive ||
+ flags and ConnDisableTimers != 0 ||
+ (timer != Long.MAX_VALUE && newDeadline >= timer)
+ ) {
+ // Ignore any deadline scheduled at the maximum value
+ // This indicates that the source does not want to register a timer
+ return
+ }
+
+ // Construct a timer for the new deadline and add it to the global queue of timers
+ _timer = newDeadline
+ timerQueue.add(this, newDeadline)
+
+ // Slow-path: a timer already exists for this connection, so add it to the queue of pending timers
+ if (timer != Long.MAX_VALUE) {
+ pendingTimers.addFirst(timer)
+ }
+ }
+
+ /**
+ * This method is invoked when the system converges into a steady state.
+ */
+ fun onConverge(now: Long) {
+ try {
+ val flags = _flags
+
+ // The connection is converging now, so unset the convergence pending flag
+ _flags = flags and ConnConvergePending.inv()
+
+ // Call the source converge callback if it has enabled convergence
+ if (flags and ConnConvergeSource != 0) {
+ val delta = max(0, now - _lastSourceConvergence)
+ _lastSourceConvergence = now
+
+ source.onConverge(this, now, delta)
+ }
+
+ // Call the consumer callback if it has enabled convergence
+ if (flags and ConnConvergeConsumer != 0) {
+ val delta = max(0, now - _lastConsumerConvergence)
+ _lastConsumerConvergence = now
+
+ logic.onConverge(this, now, delta)
+ }
+ } catch (cause: Throwable) {
+ // Invoke the finish callbacks
+ doFailSource(now, cause)
+
+ // Mark the connection as closed
+ _flags = (_flags and ConnState.inv()) or ConnClosed
+ _demand = 0.0
+ _deadline = Long.MAX_VALUE
+ }
+ }
+
+ override fun toString(): String = "FlowConsumerContextImpl[capacity=$capacity,rate=$_rate]"
+
+ /**
+ * Stop the [FlowSource].
+ */
+ private fun doStopSource(now: Long) {
+ try {
+ source.onStop(this, now, max(0, now - _lastPull))
+ doFinishConsumer(now, null)
+ } catch (cause: Throwable) {
+ doFinishConsumer(now, cause)
+ }
+ }
+
+ /**
+ * Fail the [FlowSource].
+ */
+ private fun doFailSource(now: Long, cause: Throwable) {
+ try {
+ source.onStop(this, now, max(0, now - _lastPull))
+ } catch (e: Throwable) {
+ e.addSuppressed(cause)
+ doFinishConsumer(now, e)
+ }
+ }
+
+ /**
+ * Finish the consumer.
+ */
+ private fun doFinishConsumer(now: Long, cause: Throwable?) {
+ try {
+ logic.onFinish(this, now, max(0, now - _lastPush), cause)
+ } catch (e: Throwable) {
+ e.addSuppressed(cause)
+ logger.error(e) { "Uncaught exception" }
+ }
+ }
+
+ /**
+ * Schedule an immediate update for this connection.
+ */
+ private fun scheduleImmediate(now: Long, flags: Int) {
+ // In case an immediate update is already scheduled, no need to do anything
+ if (flags and ConnUpdatePending != 0) {
+ _flags = flags
+ return
+ }
+
+ // Mark the connection that there is an update pending
+ _flags = flags or ConnUpdatePending
+
+ engine.scheduleImmediate(now, this)
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt
new file mode 100644
index 00000000..c6cba4b7
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt
@@ -0,0 +1,116 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.internal
+
+import java.util.*
+
+/**
+ * A specialized [ArrayDeque] for [FlowConsumerContextImpl] implementations.
+ */
+internal class FlowDeque(initialCapacity: Int = 256) {
+ /**
+ * The array of elements in the queue.
+ */
+ private var _elements: Array<FlowConsumerContextImpl?> = arrayOfNulls(initialCapacity)
+ private var _head = 0
+ private var _tail = 0
+
+ /**
+ * Determine whether this queue is not empty.
+ */
+ fun isNotEmpty(): Boolean {
+ return _head != _tail
+ }
+
+ /**
+ * Add the specified [ctx] to the queue.
+ */
+ fun add(ctx: FlowConsumerContextImpl) {
+ val es = _elements
+ var tail = _tail
+
+ es[tail] = ctx
+
+ tail = inc(tail, es.size)
+ _tail = tail
+
+ if (_head == tail) {
+ doubleCapacity()
+ }
+ }
+
+ /**
+ * Remove a [FlowConsumerContextImpl] from the queue or `null` if the queue is empty.
+ */
+ fun poll(): FlowConsumerContextImpl? {
+ val es = _elements
+ val head = _head
+ val ctx = es[head]
+
+ if (ctx != null) {
+ es[head] = null
+ _head = inc(head, es.size)
+ }
+
+ return ctx
+ }
+
+ /**
+ * Clear the queue.
+ */
+ fun clear() {
+ _elements.fill(null)
+ _head = 0
+ _tail = 0
+ }
+
+ private fun inc(i: Int, modulus: Int): Int {
+ var x = i
+ if (++x >= modulus) {
+ x = 0
+ }
+ return x
+ }
+
+ /**
+ * Doubles the capacity of this deque
+ */
+ private fun doubleCapacity() {
+ assert(_head == _tail)
+ val p = _head
+ val n = _elements.size
+ val r = n - p // number of elements to the right of p
+
+ val newCapacity = n shl 1
+ check(newCapacity >= 0) { "Sorry, deque too big" }
+
+ val a = arrayOfNulls<FlowConsumerContextImpl>(newCapacity)
+
+ _elements.copyInto(a, 0, p, r)
+ _elements.copyInto(a, r, 0, p)
+
+ _elements = a
+ _head = 0
+ _tail = n
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
new file mode 100644
index 00000000..3c79d54e
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
@@ -0,0 +1,215 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.internal
+
+import kotlinx.coroutines.Delay
+import kotlinx.coroutines.DisposableHandle
+import kotlinx.coroutines.InternalCoroutinesApi
+import kotlinx.coroutines.Runnable
+import org.opendc.simulator.flow.*
+import java.time.Clock
+import java.util.*
+import kotlin.coroutines.ContinuationInterceptor
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * Internal implementation of the [FlowEngine] interface.
+ *
+ * @param context The coroutine context to use.
+ * @param clock The virtual simulation clock.
+ */
+internal class FlowEngineImpl(private val context: CoroutineContext, clock: Clock) : FlowEngine, Runnable {
+ /**
+ * The [Delay] instance that provides scheduled execution of [Runnable]s.
+ */
+ @OptIn(InternalCoroutinesApi::class)
+ private val delay = requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" }
+
+ /**
+ * The queue of connection updates that are scheduled for immediate execution.
+ */
+ private val queue = FlowDeque()
+
+ /**
+ * A priority queue containing the connection updates to be scheduled in the future.
+ */
+ private val futureQueue = FlowTimerQueue()
+
+ /**
+ * The stack of engine invocations to occur in the future.
+ */
+ private val futureInvocations = ArrayDeque<Invocation>()
+
+ /**
+ * The systems that have been visited during the engine cycle.
+ */
+ private val visited = FlowDeque()
+
+ /**
+ * The index in the batch stack.
+ */
+ private var batchIndex = 0
+
+ /**
+ * The virtual [Clock] of this engine.
+ */
+ override val clock: Clock
+ get() = _clock
+ private val _clock: Clock = clock
+
+ /**
+ * Update the specified [ctx] synchronously.
+ */
+ fun scheduleSync(now: Long, ctx: FlowConsumerContextImpl) {
+ ctx.doUpdate(now, visited, futureQueue, isImmediate = true)
+
+ // In-case the engine is already running in the call-stack, return immediately. The changes will be picked
+ // up by the active engine.
+ if (batchIndex > 0) {
+ return
+ }
+
+ doRunEngine(now)
+ }
+
+ /**
+ * Enqueue the specified [ctx] to be updated immediately during the active engine cycle.
+ *
+ * This method should be used when the state of a flow context is invalidated/interrupted and needs to be
+ * re-computed. In case no engine is currently active, the engine will be started.
+ */
+ fun scheduleImmediate(now: Long, ctx: FlowConsumerContextImpl) {
+ queue.add(ctx)
+
+ // In-case the engine is already running in the call-stack, return immediately. The changes will be picked
+ // up by the active engine.
+ if (batchIndex > 0) {
+ return
+ }
+
+ doRunEngine(now)
+ }
+
+ override fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext = FlowConsumerContextImpl(this, consumer, provider)
+
+ override fun pushBatch() {
+ batchIndex++
+ }
+
+ override fun popBatch() {
+ try {
+ // Flush the work if the engine is not already running
+ if (batchIndex == 1 && queue.isNotEmpty()) {
+ doRunEngine(_clock.millis())
+ }
+ } finally {
+ batchIndex--
+ }
+ }
+
+ /* Runnable */
+ override fun run() {
+ val invocation = futureInvocations.poll() // Clear invocation from future invocation queue
+ doRunEngine(invocation.timestamp)
+ }
+
+ /**
+ * Run all the enqueued actions for the specified [timestamp][now].
+ */
+ private fun doRunEngine(now: Long) {
+ val queue = queue
+ val futureQueue = futureQueue
+ val futureInvocations = futureInvocations
+ val visited = visited
+
+ try {
+ // Increment batch index so synchronous calls will not launch concurrent engine invocations
+ batchIndex++
+
+ // Execute all scheduled updates at current timestamp
+ while (true) {
+ val ctx = futureQueue.poll(now) ?: break
+ ctx.doUpdate(now, visited, futureQueue, isImmediate = false)
+ }
+
+ // Repeat execution of all immediate updates until the system has converged to a steady-state
+ // We have to take into account that the onConverge callback can also trigger new actions.
+ do {
+ // Execute all immediate updates
+ while (true) {
+ val ctx = queue.poll() ?: break
+ ctx.doUpdate(now, visited, futureQueue, isImmediate = true)
+ }
+
+ while (true) {
+ val ctx = visited.poll() ?: break
+ ctx.onConverge(now)
+ }
+ } while (queue.isNotEmpty())
+ } finally {
+ // Decrement batch index to indicate no engine is active at the moment
+ batchIndex--
+ }
+
+ // Schedule an engine invocation for the next update to occur.
+ val headDeadline = futureQueue.peekDeadline()
+ if (headDeadline != Long.MAX_VALUE) {
+ trySchedule(now, futureInvocations, headDeadline)
+ }
+ }
+
+ /**
+ * Try to schedule an engine invocation at the specified [target].
+ *
+ * @param now The current virtual timestamp.
+ * @param target The virtual timestamp at which the engine invocation should happen.
+ * @param scheduled The queue of scheduled invocations.
+ */
+ private fun trySchedule(now: Long, scheduled: ArrayDeque<Invocation>, target: Long) {
+ val head = scheduled.peek()
+
+ // Only schedule a new scheduler invocation in case the target is earlier than all other pending
+ // scheduler invocations
+ if (head == null || target < head.timestamp) {
+ @OptIn(InternalCoroutinesApi::class)
+ val handle = delay.invokeOnTimeout(target - now, this, context)
+ scheduled.addFirst(Invocation(target, handle))
+ }
+ }
+
+ /**
+ * A future engine invocation.
+ *
+ * This class is used to keep track of the future engine invocations created using the [Delay] instance. In case
+ * the invocation is not needed anymore, it can be cancelled via [cancel].
+ */
+ private class Invocation(
+ @JvmField val timestamp: Long,
+ @JvmField val handle: DisposableHandle
+ ) {
+ /**
+ * Cancel the engine invocation.
+ */
+ fun cancel() = handle.dispose()
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt
new file mode 100644
index 00000000..22a390e6
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt
@@ -0,0 +1,195 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.internal
+
+/**
+ * Specialized priority queue for flow timers.
+ */
+internal class FlowTimerQueue(initialCapacity: Int = 256) {
+ /**
+ * The binary heap of deadlines.
+ */
+ private var _deadlines = LongArray(initialCapacity) { Long.MIN_VALUE }
+
+ /**
+ * The binary heap of [FlowConsumerContextImpl]s.
+ */
+ private var _pending = arrayOfNulls<FlowConsumerContextImpl>(initialCapacity)
+
+ /**
+ * The number of elements in the priority queue.
+ */
+ private var size = 0
+
+ /**
+ * Register a timer for [ctx] with [deadline].
+ */
+ fun add(ctx: FlowConsumerContextImpl, deadline: Long) {
+ val i = size
+ val deadlines = _deadlines
+ if (i >= deadlines.size) {
+ grow()
+ }
+
+ siftUp(deadlines, _pending, i, ctx, deadline)
+
+ size = i + 1
+ }
+
+ /**
+ * Update all pending [FlowConsumerContextImpl]s at the timestamp [now].
+ */
+ fun poll(now: Long): FlowConsumerContextImpl? {
+ if (size == 0) {
+ return null
+ }
+
+ val deadlines = _deadlines
+ val deadline = deadlines[0]
+
+ if (now < deadline) {
+ return null
+ }
+
+ val pending = _pending
+ val res = pending[0]
+ val s = --size
+
+ val nextDeadline = deadlines[s]
+ val next = pending[s]!!
+
+ // Clear the last element of the queue
+ pending[s] = null
+ deadlines[s] = Long.MIN_VALUE
+
+ if (s != 0) {
+ siftDown(deadlines, pending, next, nextDeadline)
+ }
+
+ return res
+ }
+
+ /**
+ * Find the earliest deadline in the queue.
+ */
+ fun peekDeadline(): Long {
+ return if (size == 0) Long.MAX_VALUE else _deadlines[0]
+ }
+
+ /**
+ * Increases the capacity of the array.
+ */
+ private fun grow() {
+ val oldCapacity = _deadlines.size
+ // Double size if small; else grow by 50%
+ val newCapacity = oldCapacity + if (oldCapacity < 64) oldCapacity + 2 else oldCapacity shr 1
+
+ _deadlines = _deadlines.copyOf(newCapacity)
+ _pending = _pending.copyOf(newCapacity)
+ }
+
+ /**
+ * Insert item [ctx] at position [pos], maintaining heap invariant by promoting [ctx] up the tree until it is
+ * greater than or equal to its parent, or is the root.
+ *
+ * @param deadlines The heap of deadlines.
+ * @param pending The heap of contexts.
+ * @param pos The position to fill.
+ * @param ctx The [FlowConsumerContextImpl] to insert.
+ * @param deadline The deadline of the context.
+ */
+ private fun siftUp(
+ deadlines: LongArray,
+ pending: Array<FlowConsumerContextImpl?>,
+ pos: Int,
+ ctx: FlowConsumerContextImpl,
+ deadline: Long
+ ) {
+ var k = pos
+
+ while (k > 0) {
+ val parent = (k - 1) ushr 1
+ val parentDeadline = deadlines[parent]
+
+ if (deadline >= parentDeadline) {
+ break
+ }
+
+ deadlines[k] = parentDeadline
+ pending[k] = pending[parent]
+
+ k = parent
+ }
+
+ deadlines[k] = deadline
+ pending[k] = ctx
+ }
+
+ /**
+ * Inserts [ctx] at the top, maintaining heap invariant by demoting [ctx] down the tree repeatedly until it
+ * is less than or equal to its children or is a leaf.
+ *
+ * @param deadlines The heap of deadlines.
+ * @param pending The heap of contexts.
+ * @param ctx The [FlowConsumerContextImpl] to insert.
+ * @param deadline The deadline of the context.
+ */
+ private fun siftDown(
+ deadlines: LongArray,
+ pending: Array<FlowConsumerContextImpl?>,
+ ctx: FlowConsumerContextImpl,
+ deadline: Long
+ ) {
+ var k = 0
+ val size = size
+ val half = size ushr 1
+
+ while (k < half) {
+ var child = (k shl 1) + 1
+
+ var childDeadline = deadlines[child]
+ val right = child + 1
+
+ if (right < size) {
+ val rightDeadline = deadlines[right]
+
+ if (childDeadline > rightDeadline) {
+ child = right
+ childDeadline = rightDeadline
+ }
+ }
+
+ if (deadline <= childDeadline) {
+ break
+ }
+
+ deadlines[k] = childDeadline
+ pending[k] = pending[child]
+
+ k = child
+ }
+
+ deadlines[k] = deadline
+ pending[k] = ctx
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt
new file mode 100644
index 00000000..d990dc61
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.internal
+
+import org.opendc.simulator.flow.FlowCounters
+
+/**
+ * Mutable implementation of the [FlowCounters] interface.
+ */
+public class MutableFlowCounters : FlowCounters {
+ override val demand: Double
+ get() = _counters[0]
+ override val actual: Double
+ get() = _counters[1]
+ override val remaining: Double
+ get() = _counters[2]
+ override val interference: Double
+ get() = _counters[3]
+ private val _counters = DoubleArray(4)
+
+ override fun reset() {
+ _counters.fill(0.0)
+ }
+
+ public fun increment(demand: Double, actual: Double, remaining: Double, interference: Double) {
+ val counters = _counters
+ counters[0] += demand
+ counters[1] += actual
+ counters[2] += remaining
+ counters[3] += interference
+ }
+
+ override fun toString(): String {
+ return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining,interference=$interference]"
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
new file mode 100644
index 00000000..04ba7f21
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
@@ -0,0 +1,100 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.mux
+
+import org.opendc.simulator.flow.FlowConsumer
+import org.opendc.simulator.flow.FlowCounters
+import org.opendc.simulator.flow.FlowSource
+import org.opendc.simulator.flow.interference.InterferenceKey
+
+/**
+ * A [FlowMultiplexer] enables multiplexing multiple [FlowSource]s over possibly multiple [FlowConsumer]s.
+ */
+public interface FlowMultiplexer {
+ /**
+ * The inputs of the multiplexer that can be used to consume sources.
+ */
+ public val inputs: Set<FlowConsumer>
+
+ /**
+ * The outputs of the multiplexer over which the flows will be distributed.
+ */
+ public val outputs: Set<FlowSource>
+
+ /**
+ * The actual processing rate of the multiplexer.
+ */
+ public val rate: Double
+
+ /**
+ * The demanded processing rate of the input.
+ */
+ public val demand: Double
+
+ /**
+ * The capacity of the outputs.
+ */
+ public val capacity: Double
+
+ /**
+ * The flow counters to track the flow metrics of all multiplexer inputs.
+ */
+ public val counters: FlowCounters
+
+ /**
+ * Create a new input on this multiplexer.
+ *
+ * @param key The key of the interference member to which the input belongs.
+ */
+ public fun newInput(key: InterferenceKey? = null): FlowConsumer
+
+ /**
+ * Remove [input] from this multiplexer.
+ */
+ public fun removeInput(input: FlowConsumer)
+
+ /**
+ * Create a new output on this multiplexer.
+ */
+ public fun newOutput(): FlowSource
+
+ /**
+ * Remove [output] from this multiplexer.
+ */
+ public fun removeOutput(output: FlowSource)
+
+ /**
+ * Clear all inputs and outputs from the multiplexer.
+ */
+ public fun clear()
+
+ /**
+ * Clear the inputs of the multiplexer.
+ */
+ public fun clearInputs()
+
+ /**
+ * Clear the outputs of the multiplexer.
+ */
+ public fun clearOutputs()
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
new file mode 100644
index 00000000..125d10fe
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
@@ -0,0 +1,154 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.mux
+
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.interference.InterferenceKey
+import java.util.ArrayDeque
+
+/**
+ * A [FlowMultiplexer] implementation that allocates inputs to the outputs of the multiplexer exclusively. This means
+ * that a single input is directly connected to an output and that the multiplexer can only support as many
+ * inputs as outputs.
+ *
+ * @param engine The [FlowEngine] driving the simulation.
+ */
+public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMultiplexer {
+ override val inputs: Set<FlowConsumer>
+ get() = _inputs
+ private val _inputs = mutableSetOf<Input>()
+
+ override val outputs: Set<FlowSource>
+ get() = _outputs
+ private val _outputs = mutableSetOf<Output>()
+ private val _availableOutputs = ArrayDeque<Output>()
+
+ override val counters: FlowCounters = object : FlowCounters {
+ override val demand: Double
+ get() = _outputs.sumOf { it.forwarder.counters.demand }
+ override val actual: Double
+ get() = _outputs.sumOf { it.forwarder.counters.actual }
+ override val remaining: Double
+ get() = _outputs.sumOf { it.forwarder.counters.remaining }
+ override val interference: Double
+ get() = _outputs.sumOf { it.forwarder.counters.interference }
+
+ override fun reset() {
+ for (output in _outputs) {
+ output.forwarder.counters.reset()
+ }
+ }
+
+ override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining]"
+ }
+
+ override val rate: Double
+ get() = _outputs.sumOf { it.forwarder.rate }
+
+ override val demand: Double
+ get() = _outputs.sumOf { it.forwarder.demand }
+
+ override val capacity: Double
+ get() = _outputs.sumOf { it.forwarder.capacity }
+
+ override fun newInput(key: InterferenceKey?): FlowConsumer {
+ val output = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" }
+ val input = Input(output)
+ _inputs += input
+ return input
+ }
+
+ override fun removeInput(input: FlowConsumer) {
+ if (!_inputs.remove(input)) {
+ return
+ }
+
+ val output = (input as Input).output
+ output.forwarder.cancel()
+ _availableOutputs += output
+ }
+
+ override fun newOutput(): FlowSource {
+ val forwarder = FlowForwarder(engine)
+ val output = Output(forwarder)
+
+ _outputs += output
+ return output
+ }
+
+ override fun removeOutput(output: FlowSource) {
+ if (!_outputs.remove(output)) {
+ return
+ }
+
+ val forwarder = (output as Output).forwarder
+ forwarder.close()
+ }
+
+ override fun clearInputs() {
+ for (input in _inputs) {
+ val output = input.output
+ output.forwarder.cancel()
+ _availableOutputs += output
+ }
+
+ _inputs.clear()
+ }
+
+ override fun clearOutputs() {
+ for (output in _outputs) {
+ output.forwarder.cancel()
+ }
+ _outputs.clear()
+ _availableOutputs.clear()
+ }
+
+ override fun clear() {
+ clearOutputs()
+ clearInputs()
+ }
+
+ /**
+ * An input on the multiplexer.
+ */
+ private inner class Input(@JvmField val output: Output) : FlowConsumer by output.forwarder {
+ override fun toString(): String = "ForwardingFlowMultiplexer.Input"
+ }
+
+ /**
+ * An output on the multiplexer.
+ */
+ private inner class Output(@JvmField val forwarder: FlowForwarder) : FlowSource by forwarder {
+ override fun onStart(conn: FlowConnection, now: Long) {
+ _availableOutputs += this
+ forwarder.onStart(conn, now)
+ }
+
+ override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ forwarder.cancel()
+ forwarder.onStop(conn, now, delta)
+ }
+
+ override fun toString(): String = "ForwardingFlowMultiplexer.Output"
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
new file mode 100644
index 00000000..a0fb8a4e
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -0,0 +1,789 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.mux
+
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.interference.InterferenceDomain
+import org.opendc.simulator.flow.interference.InterferenceKey
+import org.opendc.simulator.flow.internal.D_MS_TO_S
+import org.opendc.simulator.flow.internal.MutableFlowCounters
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * A [FlowMultiplexer] implementation that multiplexes flows over the available outputs using max-min fair sharing.
+ *
+ * @param engine The [FlowEngine] to drive the flow simulation.
+ * @param parent The parent flow system of the multiplexer.
+ * @param interferenceDomain The interference domain of the multiplexer.
+ */
+public class MaxMinFlowMultiplexer(
+ private val engine: FlowEngine,
+ parent: FlowConvergenceListener? = null,
+ private val interferenceDomain: InterferenceDomain? = null
+) : FlowMultiplexer {
+ /**
+ * The inputs of the multiplexer.
+ */
+ override val inputs: Set<FlowConsumer>
+ get() = _inputs
+ private val _inputs = mutableSetOf<Input>()
+
+ /**
+ * The outputs of the multiplexer.
+ */
+ override val outputs: Set<FlowSource>
+ get() = _outputs
+ private val _outputs = mutableSetOf<Output>()
+
+ /**
+ * The flow counters of this multiplexer.
+ */
+ public override val counters: FlowCounters
+ get() = scheduler.counters
+
+ /**
+ * The actual processing rate of the multiplexer.
+ */
+ public override val rate: Double
+ get() = scheduler.rate
+
+ /**
+ * The demanded processing rate of the input.
+ */
+ public override val demand: Double
+ get() = scheduler.demand
+
+ /**
+ * The capacity of the outputs.
+ */
+ public override val capacity: Double
+ get() = scheduler.capacity
+
+ /**
+ * The [Scheduler] instance of this multiplexer.
+ */
+ private val scheduler = Scheduler(engine, parent)
+
+ override fun newInput(key: InterferenceKey?): FlowConsumer {
+ val provider = Input(engine, scheduler, interferenceDomain, key, scheduler.capacity)
+ _inputs.add(provider)
+ return provider
+ }
+
+ override fun removeInput(input: FlowConsumer) {
+ if (!_inputs.remove(input)) {
+ return
+ }
+ // This cast should always succeed since only `Input` instances should be added to `_inputs`
+ (input as Input).close()
+ }
+
+ override fun newOutput(): FlowSource {
+ val output = Output(scheduler)
+ _outputs.add(output)
+ return output
+ }
+
+ override fun removeOutput(output: FlowSource) {
+ if (!_outputs.remove(output)) {
+ return
+ }
+
+ // This cast should always succeed since only `Output` instances should be added to `_outputs`
+ (output as Output).cancel()
+ }
+
+ override fun clearInputs() {
+ for (input in _inputs) {
+ input.cancel()
+ }
+ _inputs.clear()
+ }
+
+ override fun clearOutputs() {
+ for (output in _outputs) {
+ output.cancel()
+ }
+ _outputs.clear()
+ }
+
+ override fun clear() {
+ clearOutputs()
+ clearInputs()
+ }
+
+ /**
+ * Helper class containing the scheduler state.
+ */
+ private class Scheduler(engine: FlowEngine, private val parent: FlowConvergenceListener?) {
+ /**
+ * The flow counters of this scheduler.
+ */
+ @JvmField val counters = MutableFlowCounters()
+
+ /**
+ * The flow rate of the multiplexer.
+ */
+ @JvmField var rate = 0.0
+
+ /**
+ * The demand for the multiplexer.
+ */
+ @JvmField var demand = 0.0
+
+ /**
+ * The capacity of the multiplexer.
+ */
+ @JvmField var capacity = 0.0
+
+ /**
+ * An [Output] that is used to activate the scheduler.
+ */
+ @JvmField var activationOutput: Output? = null
+
+ /**
+ * The active inputs registered with the scheduler.
+ */
+ private val _activeInputs = mutableListOf<Input>()
+
+ /**
+ * An array containing the active inputs, which is used to reduce the overhead of an [ArrayList].
+ */
+ private var _inputArray = emptyArray<Input>()
+
+ /**
+ * The active outputs registered with the scheduler.
+ */
+ private val _activeOutputs = mutableListOf<Output>()
+
+ /**
+ * Flag to indicate that the scheduler is active.
+ */
+ private var _schedulerActive = false
+ private var _lastSchedulerCycle = Long.MAX_VALUE
+
+ /**
+ * The last convergence timestamp and the input.
+ */
+ private var _lastConverge: Long = Long.MIN_VALUE
+ private var _lastConvergeInput: Input? = null
+
+ /**
+ * The simulation clock.
+ */
+ private val _clock = engine.clock
+
+ /**
+ * Register the specified [input] to this scheduler.
+ */
+ fun registerInput(input: Input) {
+ _activeInputs.add(input)
+ _inputArray = _activeInputs.toTypedArray()
+
+ val hasActivationOutput = activationOutput != null
+
+ // Disable timers and convergence of the source if one of the output manages it
+ input.shouldConsumerConverge = !hasActivationOutput
+ input.enableTimers = !hasActivationOutput
+ input.capacity = capacity
+
+ trigger(_clock.millis())
+ }
+
+ /**
+ * De-register the specified [input] from this scheduler.
+ */
+ fun deregisterInput(input: Input, now: Long) {
+ // Assign a new input responsible for handling the convergence events
+ if (_lastConvergeInput == input) {
+ _lastConvergeInput = null
+ }
+
+ _activeInputs.remove(input)
+
+ // Re-run scheduler to distribute new load
+ trigger(now)
+ }
+
+ /**
+ * This method is invoked when one of the inputs converges.
+ */
+ fun convergeInput(input: Input, now: Long) {
+
+ val lastConverge = _lastConverge
+ val lastConvergeInput = _lastConvergeInput
+ val parent = parent
+
+ if (parent != null && (now > lastConverge || lastConvergeInput == null || lastConvergeInput == input)) {
+ _lastConverge = now
+ _lastConvergeInput = input
+
+ parent.onConverge(now, max(0, now - lastConverge))
+ }
+ }
+
+ /**
+ * Register the specified [output] to this scheduler.
+ */
+ fun registerOutput(output: Output) {
+ _activeOutputs.add(output)
+
+ updateCapacity()
+ updateActivationOutput()
+ }
+
+ /**
+ * De-register the specified [output] from this scheduler.
+ */
+ fun deregisterOutput(output: Output, now: Long) {
+ _activeOutputs.remove(output)
+ updateCapacity()
+
+ trigger(now)
+ }
+
+ /**
+ * This method is invoked when one of the outputs converges.
+ */
+ fun convergeOutput(output: Output, now: Long) {
+ val lastConverge = _lastConverge
+ val parent = parent
+
+ if (parent != null) {
+ _lastConverge = now
+
+ parent.onConverge(now, max(0, now - lastConverge))
+ }
+
+ if (!output.isActive) {
+ output.isActivationOutput = false
+ updateActivationOutput()
+ }
+ }
+
+ /**
+ * Trigger the scheduler of the multiplexer.
+ *
+ * @param now The current virtual timestamp of the simulation.
+ */
+ fun trigger(now: Long) {
+ if (_schedulerActive) {
+ // No need to trigger the scheduler in case it is already active
+ return
+ }
+
+ val activationOutput = activationOutput
+
+ // We can run the scheduler in two ways:
+ // (1) We can pull one of the multiplexer's outputs. This allows us to cascade multiple pushes by the input
+ // into a single scheduling cycle, but is slower in case of a few changes at the same timestamp.
+ // (2) We run the scheduler directly from this method call. This is the fastest approach when there are only
+ // a few inputs and little changes at the same timestamp.
+ // We always pick for option (1) unless there are no outputs available.
+ if (activationOutput != null) {
+ activationOutput.pull(now)
+ return
+ } else {
+ runScheduler(now)
+ }
+ }
+
+ /**
+ * Synchronously run the scheduler of the multiplexer.
+ */
+ fun runScheduler(now: Long): Long {
+ val lastSchedulerCycle = _lastSchedulerCycle
+ _lastSchedulerCycle = now
+
+ val delta = max(0, now - lastSchedulerCycle)
+
+ return try {
+ _schedulerActive = true
+ doRunScheduler(now, delta)
+ } finally {
+ _schedulerActive = false
+ }
+ }
+
+ /**
+ * Recompute the capacity of the multiplexer.
+ */
+ fun updateCapacity() {
+ val newCapacity = _activeOutputs.sumOf(Output::capacity)
+
+ // No-op if the capacity is unchanged
+ if (capacity == newCapacity) {
+ return
+ }
+
+ capacity = newCapacity
+
+ for (input in _activeInputs) {
+ input.capacity = newCapacity
+ }
+
+ // Sort outputs by their capacity
+ _activeOutputs.sort()
+ }
+
+ /**
+ * Updates the output that is used for scheduler activation.
+ */
+ private fun updateActivationOutput() {
+ val output = _activeOutputs.firstOrNull()
+ activationOutput = output
+
+ if (output != null) {
+ output.isActivationOutput = true
+ }
+
+ val hasActivationOutput = output != null
+
+ for (input in _activeInputs) {
+ input.shouldConsumerConverge = !hasActivationOutput
+ input.enableTimers = !hasActivationOutput
+ }
+ }
+
+ /**
+ * Schedule the inputs over the outputs.
+ *
+ * @return The deadline after which a new scheduling cycle should start.
+ */
+ private fun doRunScheduler(now: Long, delta: Long): Long {
+ val activeInputs = _activeInputs
+ val activeOutputs = _activeOutputs
+ var inputArray = _inputArray
+ var inputSize = _inputArray.size
+
+ // Update the counters of the scheduler
+ updateCounters(delta)
+
+ // If there is no work yet, mark the inputs as idle.
+ if (inputSize == 0) {
+ demand = 0.0
+ rate = 0.0
+ return Long.MAX_VALUE
+ }
+
+ val capacity = capacity
+ var availableCapacity = capacity
+ var deadline = Long.MAX_VALUE
+ var demand = 0.0
+ var shouldRebuild = false
+
+ // Pull in the work of the inputs
+ for (i in 0 until inputSize) {
+ val input = inputArray[i]
+
+ input.pullSync(now)
+
+ // Remove inputs that have finished
+ if (!input.isActive) {
+ input.actualRate = 0.0
+ shouldRebuild = true
+ } else {
+ demand += input.limit
+ deadline = min(deadline, input.deadline)
+ }
+ }
+
+ // Slow-path: Rebuild the input array based on the (apparently) updated `activeInputs`
+ if (shouldRebuild) {
+ inputArray = activeInputs.toTypedArray()
+ inputSize = inputArray.size
+ _inputArray = inputArray
+ }
+
+ val rate = if (demand > capacity) {
+ // If the demand is higher than the capacity, we need use max-min fair sharing to distribute the
+ // constrained capacity across the inputs.
+
+ // Sort in-place the inputs based on their pushed flow.
+ // Profiling shows that it is faster than maintaining some kind of sorted set.
+ inputArray.sort()
+
+ // Divide the available output capacity fairly over the inputs using max-min fair sharing
+ for (i in 0 until inputSize) {
+ val input = inputArray[i]
+ val availableShare = availableCapacity / (inputSize - i)
+ val grantedRate = min(input.allowedRate, availableShare)
+
+ availableCapacity -= grantedRate
+ input.actualRate = grantedRate
+ }
+
+ capacity - availableCapacity
+ } else {
+ demand
+ }
+
+ this.demand = demand
+ if (this.rate != rate) {
+ // Only update the outputs if the output rate has changed
+ this.rate = rate
+
+ // Divide the requests over the available capacity of the input resources fairly
+ for (i in activeOutputs.indices) {
+ val output = activeOutputs[i]
+ val inputCapacity = output.capacity
+ val fraction = inputCapacity / capacity
+ val grantedSpeed = rate * fraction
+
+ output.push(grantedSpeed)
+ }
+ }
+
+ return deadline
+ }
+
+ /**
+ * The previous capacity of the multiplexer.
+ */
+ private var _previousCapacity = 0.0
+
+ /**
+ * Update the counters of the scheduler.
+ */
+ private fun updateCounters(delta: Long) {
+ val previousCapacity = _previousCapacity
+ _previousCapacity = capacity
+
+ if (delta <= 0) {
+ return
+ }
+
+ val deltaS = delta * D_MS_TO_S
+ val demand = demand
+ val rate = rate
+
+ counters.increment(
+ demand = demand * deltaS,
+ actual = rate * deltaS,
+ remaining = (previousCapacity - rate) * deltaS,
+ interference = 0.0
+ )
+ }
+ }
+
+ /**
+ * An internal [FlowConsumer] implementation for multiplexer inputs.
+ */
+ private class Input(
+ private val engine: FlowEngine,
+ private val scheduler: Scheduler,
+ private val interferenceDomain: InterferenceDomain?,
+ @JvmField val key: InterferenceKey?,
+ initialCapacity: Double,
+ ) : FlowConsumer, FlowConsumerLogic, Comparable<Input> {
+ /**
+ * A flag to indicate that the consumer is active.
+ */
+ override val isActive: Boolean
+ get() = _ctx != null
+
+ /**
+ * The demand of the consumer.
+ */
+ override val demand: Double
+ get() = limit
+
+ /**
+ * The processing rate of the consumer.
+ */
+ override val rate: Double
+ get() = actualRate
+
+ /**
+ * The capacity of the input.
+ */
+ override var capacity: Double
+ get() = _capacity
+ set(value) {
+ allowedRate = min(limit, value)
+ _capacity = value
+ _ctx?.capacity = value
+ }
+ private var _capacity = initialCapacity
+
+ /**
+ * The flow counters to track the flow metrics of the consumer.
+ */
+ override val counters: FlowCounters
+ get() = _counters
+ private val _counters = MutableFlowCounters()
+
+ /**
+ * A flag to enable timers for the input.
+ */
+ var enableTimers: Boolean = true
+ set(value) {
+ field = value
+ _ctx?.enableTimers = value
+ }
+
+ /**
+ * A flag to control whether the input should converge.
+ */
+ var shouldConsumerConverge: Boolean = true
+ set(value) {
+ field = value
+ _ctx?.shouldConsumerConverge = value
+ }
+
+ /**
+ * The requested limit.
+ */
+ @JvmField var limit: Double = 0.0
+
+ /**
+ * The actual processing speed.
+ */
+ @JvmField var actualRate: Double = 0.0
+
+ /**
+ * The processing rate that is allowed by the model constraints.
+ */
+ @JvmField var allowedRate: Double = 0.0
+
+ /**
+ * The deadline of the input.
+ */
+ val deadline: Long
+ get() = _ctx?.deadline ?: Long.MAX_VALUE
+
+ /**
+ * The [FlowConsumerContext] that is currently running.
+ */
+ private var _ctx: FlowConsumerContext? = null
+
+ /**
+ * A flag to indicate that the input is closed.
+ */
+ private var _isClosed: Boolean = false
+
+ /**
+ * Close the input.
+ *
+ * This method is invoked when the user removes an input from the switch.
+ */
+ fun close() {
+ _isClosed = true
+ cancel()
+ }
+
+ /**
+ * Pull the source if necessary.
+ */
+ fun pullSync(now: Long) {
+ _ctx?.pullSync(now)
+ }
+
+ /* FlowConsumer */
+ override fun startConsumer(source: FlowSource) {
+ check(!_isClosed) { "Cannot re-use closed input" }
+ check(_ctx == null) { "Consumer is in invalid state" }
+
+ val ctx = engine.newContext(source, this)
+ _ctx = ctx
+
+ ctx.capacity = capacity
+ scheduler.registerInput(this)
+
+ ctx.start()
+ }
+
+ override fun pull() {
+ _ctx?.pull()
+ }
+
+ override fun cancel() {
+ _ctx?.close()
+ }
+
+ /* FlowConsumerLogic */
+ override fun onPush(
+ ctx: FlowConsumerContext,
+ now: Long,
+ delta: Long,
+ rate: Double
+ ) {
+ doUpdateCounters(delta)
+
+ val allowed = min(rate, capacity)
+ limit = rate
+ actualRate = allowed
+ allowedRate = allowed
+
+ scheduler.trigger(now)
+ }
+
+ override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {
+ doUpdateCounters(delta)
+
+ limit = 0.0
+ actualRate = 0.0
+ allowedRate = 0.0
+
+ scheduler.deregisterInput(this, now)
+
+ _ctx = null
+ }
+
+ override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
+ scheduler.convergeInput(this, now)
+ }
+
+ /* Comparable */
+ override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate)
+
+ /**
+ * Helper method to update the flow counters of the multiplexer.
+ */
+ private fun doUpdateCounters(delta: Long) {
+ if (delta <= 0L) {
+ return
+ }
+
+ // Compute the performance penalty due to flow interference
+ val perfScore = if (interferenceDomain != null) {
+ val load = scheduler.rate / scheduler.capacity
+ interferenceDomain.apply(key, load)
+ } else {
+ 1.0
+ }
+
+ val actualRate = actualRate
+
+ val deltaS = delta * D_MS_TO_S
+ val demand = limit * deltaS
+ val actual = actualRate * deltaS
+ val remaining = (_capacity - actualRate) * deltaS
+ val interference = actual * max(0.0, 1 - perfScore)
+
+ _counters.increment(demand, actual, remaining, interference)
+ scheduler.counters.increment(0.0, 0.0, 0.0, interference)
+ }
+ }
+
+ /**
+ * An internal [FlowSource] implementation for multiplexer outputs.
+ */
+ private class Output(private val scheduler: Scheduler) : FlowSource, Comparable<Output> {
+ /**
+ * The active [FlowConnection] of this source.
+ */
+ private var _conn: FlowConnection? = null
+
+ /**
+ * The capacity of this output.
+ */
+ @JvmField var capacity: Double = 0.0
+
+ /**
+ * A flag to indicate that this output is the activation output.
+ */
+ var isActivationOutput: Boolean
+ get() = _isActivationOutput
+ set(value) {
+ _isActivationOutput = value
+ _conn?.shouldSourceConverge = value
+ }
+ private var _isActivationOutput: Boolean = false
+
+ /**
+ * A flag to indicate that the output is active.
+ */
+ @JvmField var isActive = false
+
+ /**
+ * Push the specified rate to the consumer.
+ */
+ fun push(rate: Double) {
+ _conn?.push(rate)
+ }
+
+ /**
+ * Cancel this output.
+ */
+ fun cancel() {
+ _conn?.close()
+ }
+
+ /**
+ * Pull this output.
+ */
+ fun pull(now: Long) {
+ _conn?.pull(now)
+ }
+
+ override fun onStart(conn: FlowConnection, now: Long) {
+ assert(_conn == null) { "Source running concurrently" }
+ _conn = conn
+ capacity = conn.capacity
+ isActive = true
+
+ scheduler.registerOutput(this)
+ }
+
+ override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ _conn = null
+ capacity = 0.0
+ isActive = false
+
+ scheduler.deregisterOutput(this, now)
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ val capacity = capacity
+ if (capacity != conn.capacity) {
+ this.capacity = capacity
+ scheduler.updateCapacity()
+ }
+
+ return if (_isActivationOutput) {
+ // If this output is the activation output, synchronously run the scheduler and return the new deadline
+ val deadline = scheduler.runScheduler(now)
+ if (deadline == Long.MAX_VALUE)
+ deadline
+ else
+ deadline - now
+ } else {
+ // Output is not the activation output, so trigger activation output and do not install timer for this
+ // output (by returning `Long.MAX_VALUE`)
+ scheduler.trigger(now)
+
+ Long.MAX_VALUE
+ }
+ }
+
+ override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ if (_isActivationOutput) {
+ scheduler.convergeOutput(this, now)
+ }
+ }
+
+ override fun compareTo(other: Output): Int = capacity.compareTo(other.capacity)
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt
new file mode 100644
index 00000000..d9779c6a
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.source
+
+import org.opendc.simulator.flow.FlowConnection
+import org.opendc.simulator.flow.FlowSource
+import kotlin.math.roundToLong
+
+/**
+ * A [FlowSource] that contains a fixed [amount] and is pushed with a given [utilization].
+ */
+public class FixedFlowSource(private val amount: Double, private val utilization: Double) : FlowSource {
+
+ init {
+ require(amount >= 0.0) { "Amount must be positive" }
+ require(utilization > 0.0) { "Utilization must be positive" }
+ }
+
+ private var remainingAmount = amount
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ val consumed = conn.rate * delta / 1000.0
+ val limit = conn.capacity * utilization
+
+ remainingAmount -= consumed
+
+ val duration = (remainingAmount / limit * 1000).roundToLong()
+
+ return if (duration > 0) {
+ conn.push(limit)
+ duration
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt
new file mode 100644
index 00000000..b3191ad3
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.source
+
+/**
+ * The [FlowSourceBarrier] is a barrier that allows multiple sources to wait for a select number of other sources to
+ * finish a pull, before proceeding its operation.
+ */
+public class FlowSourceBarrier(public val parties: Int) {
+ private var counter = 0
+
+ /**
+ * Enter the barrier and determine whether the caller is the last to reach the barrier.
+ *
+ * @return `true` if the caller is the last to reach the barrier, `false` otherwise.
+ */
+ public fun enter(): Boolean {
+ val last = ++counter == parties
+ if (last) {
+ counter = 0
+ return true
+ }
+ return false
+ }
+
+ /**
+ * Reset the barrier.
+ */
+ public fun reset() {
+ counter = 0
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
new file mode 100644
index 00000000..6dd60d95
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.source
+
+import org.opendc.simulator.flow.FlowConnection
+import org.opendc.simulator.flow.FlowSource
+
+/**
+ * Helper class to expose an observable [rate] field describing the flow rate of the source.
+ */
+public class FlowSourceRateAdapter(
+ private val delegate: FlowSource,
+ private val callback: (Double) -> Unit = {}
+) : FlowSource by delegate {
+ /**
+ * The resource processing speed at this instant.
+ */
+ public var rate: Double = 0.0
+ private set(value) {
+ if (field != value) {
+ callback(value)
+ field = value
+ }
+ }
+
+ init {
+ callback(0.0)
+ }
+
+ override fun onStart(conn: FlowConnection, now: Long) {
+ conn.shouldSourceConverge = true
+
+ delegate.onStart(conn, now)
+ }
+
+ override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ try {
+ delegate.onStop(conn, now, delta)
+ } finally {
+ rate = 0.0
+ }
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return delegate.onPull(conn, now, delta)
+ }
+
+ override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ try {
+ delegate.onConverge(conn, now, delta)
+ } finally {
+ rate = conn.rate
+ }
+ }
+
+ override fun toString(): String = "FlowSourceRateAdapter[delegate=$delegate]"
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt
new file mode 100644
index 00000000..ae537845
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.source
+
+import org.opendc.simulator.flow.FlowConnection
+import org.opendc.simulator.flow.FlowSource
+
+/**
+ * A [FlowSource] that replays a sequence of [Fragment], each indicating the flow rate for some period of time.
+ */
+public class TraceFlowSource(private val trace: Sequence<Fragment>) : FlowSource {
+ private var _iterator: Iterator<Fragment>? = null
+ private var _nextTarget = Long.MIN_VALUE
+
+ override fun onStart(conn: FlowConnection, now: Long) {
+ check(_iterator == null) { "Source already running" }
+ _iterator = trace.iterator()
+ }
+
+ override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
+ _iterator = null
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ // Check whether the trace fragment was fully consumed, otherwise wait until we have done so
+ val nextTarget = _nextTarget
+ if (nextTarget > now) {
+ return now - nextTarget
+ }
+
+ val iterator = checkNotNull(_iterator)
+ return if (iterator.hasNext()) {
+ val fragment = iterator.next()
+ _nextTarget = now + fragment.duration
+ conn.push(fragment.usage)
+ fragment.duration
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+
+ /**
+ * A fragment of the trace.
+ */
+ public data class Fragment(val duration: Long, val usage: Double)
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
new file mode 100644
index 00000000..fe39eb2c
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+import io.mockk.*
+import org.junit.jupiter.api.*
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.internal.FlowConsumerContextImpl
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+
+/**
+ * A test suite for the [FlowConsumerContextImpl] class.
+ */
+class FlowConsumerContextTest {
+ @Test
+ fun testFlushWithoutCommand() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (now == 0L) {
+ conn.push(1.0)
+ 1000
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+ }
+
+ val logic = object : FlowConsumerLogic {}
+ val context = FlowConsumerContextImpl(engine, consumer, logic)
+
+ engine.scheduleSync(engine.clock.millis(), context)
+ }
+
+ @Test
+ fun testDoubleStart() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (now == 0L) {
+ conn.push(0.0)
+ 1000
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+ }
+
+ val logic = object : FlowConsumerLogic {}
+ val context = FlowConsumerContextImpl(engine, consumer, logic)
+
+ context.start()
+
+ assertThrows<IllegalStateException> {
+ context.start()
+ }
+ }
+
+ @Test
+ fun testIdempotentCapacityChange() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val consumer = spyk(object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (now == 0L) {
+ conn.push(1.0)
+ 1000
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+ })
+
+ val logic = object : FlowConsumerLogic {}
+ val context = FlowConsumerContextImpl(engine, consumer, logic)
+ context.capacity = 4200.0
+ context.start()
+ context.capacity = 4200.0
+
+ verify(exactly = 1) { consumer.onPull(any(), any(), any()) }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
new file mode 100644
index 00000000..12e72b8f
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
@@ -0,0 +1,321 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+import io.mockk.*
+import kotlinx.coroutines.*
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Disabled
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+import org.opendc.simulator.flow.source.FixedFlowSource
+
+/**
+ * A test suite for the [FlowForwarder] class.
+ */
+internal class FlowForwarderTest {
+ @Test
+ fun testCancelImmediately() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 2000.0)
+
+ launch { source.consume(forwarder) }
+
+ forwarder.consume(object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
+ return Long.MAX_VALUE
+ }
+ })
+
+ forwarder.close()
+ source.cancel()
+ }
+
+ @Test
+ fun testCancel() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 2000.0)
+
+ launch { source.consume(forwarder) }
+
+ forwarder.consume(object : FlowSource {
+ var isFirst = true
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (isFirst) {
+ isFirst = false
+ conn.push(1.0)
+ 10 * 1000
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+ })
+
+ forwarder.close()
+ source.cancel()
+ }
+
+ @Test
+ fun testState() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
+ return Long.MAX_VALUE
+ }
+ }
+
+ assertFalse(forwarder.isActive)
+
+ forwarder.startConsumer(consumer)
+ assertTrue(forwarder.isActive)
+
+ assertThrows<IllegalStateException> { forwarder.startConsumer(consumer) }
+
+ forwarder.cancel()
+ assertFalse(forwarder.isActive)
+
+ forwarder.close()
+ assertFalse(forwarder.isActive)
+ }
+
+ @Test
+ fun testCancelPendingDelegate() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+
+ val consumer = spyk(object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
+ return Long.MAX_VALUE
+ }
+ })
+
+ forwarder.startConsumer(consumer)
+ forwarder.cancel()
+
+ verify(exactly = 0) { consumer.onStop(any(), any(), any()) }
+ }
+
+ @Test
+ fun testCancelStartedDelegate() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 2000.0)
+
+ val consumer = spyk(FixedFlowSource(2000.0, 1.0))
+
+ source.startConsumer(forwarder)
+ yield()
+ forwarder.startConsumer(consumer)
+ yield()
+ forwarder.cancel()
+
+ verify(exactly = 1) { consumer.onStart(any(), any()) }
+ verify(exactly = 1) { consumer.onStop(any(), any(), any()) }
+ }
+
+ @Test
+ fun testCancelPropagation() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 2000.0)
+
+ val consumer = spyk(FixedFlowSource(2000.0, 1.0))
+
+ source.startConsumer(forwarder)
+ yield()
+ forwarder.startConsumer(consumer)
+ yield()
+ source.cancel()
+
+ verify(exactly = 1) { consumer.onStart(any(), any()) }
+ verify(exactly = 1) { consumer.onStop(any(), any(), any()) }
+ }
+
+ @Test
+ fun testExitPropagation() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine, isCoupled = true)
+ val source = FlowSink(engine, 2000.0)
+
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
+ return Long.MAX_VALUE
+ }
+ }
+
+ source.startConsumer(forwarder)
+ forwarder.consume(consumer)
+ yield()
+
+ assertFalse(forwarder.isActive)
+ }
+
+ @Test
+ @Disabled // Due to Kotlin bug: https://github.com/mockk/mockk/issues/368
+ fun testAdjustCapacity() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val sink = FlowSink(engine, 1.0)
+
+ val source = spyk(FixedFlowSource(2.0, 1.0))
+ sink.startConsumer(forwarder)
+
+ coroutineScope {
+ launch { forwarder.consume(source) }
+ delay(1000)
+ sink.capacity = 0.5
+ }
+
+ assertEquals(3000, clock.millis())
+ verify(exactly = 1) { source.onPull(any(), any(), any()) }
+ }
+
+ @Test
+ fun testCounters() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 1.0)
+
+ val consumer = FixedFlowSource(2.0, 1.0)
+ source.startConsumer(forwarder)
+
+ forwarder.consume(consumer)
+
+ yield()
+
+ assertEquals(2.0, source.counters.actual)
+ assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" }
+ assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" }
+ assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" }
+ assertEquals(2000, clock.millis())
+ }
+
+ @Test
+ fun testCoupledExit() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine, isCoupled = true)
+ val source = FlowSink(engine, 2000.0)
+
+ launch { source.consume(forwarder) }
+
+ forwarder.consume(FixedFlowSource(2000.0, 1.0))
+
+ yield()
+
+ assertFalse(source.isActive)
+ }
+
+ @Test
+ fun testPullFailureCoupled() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine, isCoupled = true)
+ val source = FlowSink(engine, 2000.0)
+
+ launch { source.consume(forwarder) }
+
+ try {
+ forwarder.consume(object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ throw IllegalStateException("Test")
+ }
+ })
+ } catch (cause: Throwable) {
+ // Ignore
+ }
+
+ yield()
+
+ assertFalse(source.isActive)
+ }
+
+ @Test
+ fun testStartFailure() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 2000.0)
+
+ launch { source.consume(forwarder) }
+
+ try {
+ forwarder.consume(object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return Long.MAX_VALUE
+ }
+
+ override fun onStart(conn: FlowConnection, now: Long) {
+ throw IllegalStateException("Test")
+ }
+ })
+ } catch (cause: Throwable) {
+ // Ignore
+ }
+
+ yield()
+
+ assertTrue(source.isActive)
+ source.cancel()
+ }
+
+ @Test
+ fun testConvergeFailure() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 2000.0)
+
+ launch { source.consume(forwarder) }
+
+ try {
+ forwarder.consume(object : FlowSource {
+ override fun onStart(conn: FlowConnection, now: Long) {
+ conn.shouldSourceConverge = true
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return Long.MAX_VALUE
+ }
+
+ override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ throw IllegalStateException("Test")
+ }
+ })
+ } catch (cause: Throwable) {
+ // Ignore
+ }
+
+ yield()
+
+ assertTrue(source.isActive)
+ source.cancel()
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
new file mode 100644
index 00000000..70c75864
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
@@ -0,0 +1,241 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+import io.mockk.spyk
+import io.mockk.verify
+import kotlinx.coroutines.*
+import org.junit.jupiter.api.*
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+import org.opendc.simulator.flow.source.FixedFlowSource
+import org.opendc.simulator.flow.source.FlowSourceRateAdapter
+
+/**
+ * A test suite for the [FlowSink] class.
+ */
+internal class FlowSinkTest {
+ @Test
+ fun testSpeed() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+
+ val consumer = FixedFlowSource(4200.0, 1.0)
+
+ val res = mutableListOf<Double>()
+ val adapter = FlowSourceRateAdapter(consumer, res::add)
+
+ provider.consume(adapter)
+
+ assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
+ }
+
+ @Test
+ fun testAdjustCapacity() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val provider = FlowSink(engine, 1.0)
+
+ val consumer = spyk(FixedFlowSource(2.0, 1.0))
+
+ coroutineScope {
+ launch { provider.consume(consumer) }
+ delay(1000)
+ provider.capacity = 0.5
+ }
+ assertEquals(3000, clock.millis())
+ verify(exactly = 3) { consumer.onPull(any(), any(), any()) }
+ }
+
+ @Test
+ fun testSpeedLimit() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+
+ val consumer = FixedFlowSource(capacity, 2.0)
+
+ val res = mutableListOf<Double>()
+ val adapter = FlowSourceRateAdapter(consumer, res::add)
+
+ provider.consume(adapter)
+
+ assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
+ }
+
+ /**
+ * Test to see whether no infinite recursion occurs when interrupting during [FlowSource.onStart] or
+ * [FlowSource.onPull].
+ */
+ @Test
+ fun testIntermediateInterrupt() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
+ return Long.MAX_VALUE
+ }
+
+ override fun onStart(conn: FlowConnection, now: Long) {
+ conn.pull()
+ }
+ }
+
+ provider.consume(consumer)
+ }
+
+ @Test
+ fun testInterrupt() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+ lateinit var resCtx: FlowConnection
+
+ val consumer = object : FlowSource {
+ var isFirst = true
+
+ override fun onStart(conn: FlowConnection, now: Long) {
+ resCtx = conn
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (isFirst) {
+ isFirst = false
+ conn.push(1.0)
+ 4000
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+ }
+
+ launch {
+ yield()
+ resCtx.pull()
+ }
+ provider.consume(consumer)
+
+ assertEquals(0, clock.millis())
+ }
+
+ @Test
+ fun testFailure() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+
+ val consumer = object : FlowSource {
+ override fun onStart(conn: FlowConnection, now: Long) {
+ throw IllegalStateException("Hi")
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return Long.MAX_VALUE
+ }
+ }
+
+ assertThrows<IllegalStateException> {
+ provider.consume(consumer)
+ }
+ }
+
+ @Test
+ fun testExceptionPropagationOnNext() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+
+ val consumer = object : FlowSource {
+ var isFirst = true
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (isFirst) {
+ isFirst = false
+ conn.push(1.0)
+ 1000
+ } else {
+ throw IllegalStateException()
+ }
+ }
+ }
+
+ assertThrows<IllegalStateException> {
+ provider.consume(consumer)
+ }
+ }
+
+ @Test
+ fun testConcurrentConsumption() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+
+ val consumer = FixedFlowSource(capacity, 1.0)
+
+ assertThrows<IllegalStateException> {
+ coroutineScope {
+ launch { provider.consume(consumer) }
+ provider.consume(consumer)
+ }
+ }
+ }
+
+ @Test
+ fun testCancelDuringConsumption() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+
+ val consumer = FixedFlowSource(capacity, 1.0)
+
+ launch { provider.consume(consumer) }
+ delay(500)
+ provider.cancel()
+
+ yield()
+
+ assertEquals(500, clock.millis())
+ }
+
+ @Test
+ fun testInfiniteSleep() {
+ assertThrows<IllegalStateException> {
+ runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long = Long.MAX_VALUE
+ }
+
+ provider.consume(consumer)
+ }
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
new file mode 100644
index 00000000..187dacd9
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt
@@ -0,0 +1,154 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.mux
+
+import kotlinx.coroutines.yield
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+import org.opendc.simulator.flow.source.FixedFlowSource
+import org.opendc.simulator.flow.source.FlowSourceRateAdapter
+import org.opendc.simulator.flow.source.TraceFlowSource
+
+/**
+ * Test suite for the [ForwardingFlowMultiplexer] class.
+ */
+internal class ForwardingFlowMultiplexerTest {
+ /**
+ * Test a trace workload.
+ */
+ @Test
+ fun testTrace() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+
+ val speed = mutableListOf<Double>()
+
+ val duration = 5 * 60L
+ val workload =
+ TraceFlowSource(
+ sequenceOf(
+ TraceFlowSource.Fragment(duration * 1000, 28.0),
+ TraceFlowSource.Fragment(duration * 1000, 3500.0),
+ TraceFlowSource.Fragment(duration * 1000, 0.0),
+ TraceFlowSource.Fragment(duration * 1000, 183.0)
+ ),
+ )
+
+ val switch = ForwardingFlowMultiplexer(engine)
+ val source = FlowSink(engine, 3200.0)
+ val forwarder = FlowForwarder(engine)
+ val adapter = FlowSourceRateAdapter(forwarder, speed::add)
+ source.startConsumer(adapter)
+ forwarder.startConsumer(switch.newOutput())
+
+ val provider = switch.newInput()
+ provider.consume(workload)
+ yield()
+
+ assertAll(
+ { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } },
+ { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } }
+ )
+ }
+
+ /**
+ * Test runtime workload on hypervisor.
+ */
+ @Test
+ fun testRuntimeWorkload() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+
+ val duration = 5 * 60L * 1000
+ val workload = FixedFlowSource(duration * 3.2, 1.0)
+
+ val switch = ForwardingFlowMultiplexer(engine)
+ val source = FlowSink(engine, 3200.0)
+
+ source.startConsumer(switch.newOutput())
+
+ val provider = switch.newInput()
+ provider.consume(workload)
+ yield()
+
+ assertEquals(duration, clock.millis()) { "Took enough time" }
+ }
+
+ /**
+ * Test two workloads running sequentially.
+ */
+ @Test
+ fun testTwoWorkloads() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+
+ val duration = 5 * 60L * 1000
+ val workload = object : FlowSource {
+ var isFirst = true
+
+ override fun onStart(conn: FlowConnection, now: Long) {
+ isFirst = true
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (isFirst) {
+ isFirst = false
+ conn.push(1.0)
+ duration
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+ }
+
+ val switch = ForwardingFlowMultiplexer(engine)
+ val source = FlowSink(engine, 3200.0)
+
+ source.startConsumer(switch.newOutput())
+
+ val provider = switch.newInput()
+ provider.consume(workload)
+ yield()
+ provider.consume(workload)
+ assertEquals(duration * 2, clock.millis()) { "Took enough time" }
+ }
+
+ /**
+ * Test concurrent workloads on the machine.
+ */
+ @Test
+ fun testConcurrentWorkloadFails() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+
+ val switch = ForwardingFlowMultiplexer(engine)
+ val source = FlowSink(engine, 3200.0)
+
+ source.startConsumer(switch.newOutput())
+
+ switch.newInput()
+ assertThrows<IllegalStateException> { switch.newInput() }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt
new file mode 100644
index 00000000..6e2cdb98
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt
@@ -0,0 +1,149 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.mux
+
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.yield
+import org.junit.jupiter.api.*
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.FlowSink
+import org.opendc.simulator.flow.consume
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+import org.opendc.simulator.flow.source.FixedFlowSource
+import org.opendc.simulator.flow.source.TraceFlowSource
+
+/**
+ * Test suite for the [FlowMultiplexer] implementations
+ */
+internal class MaxMinFlowMultiplexerTest {
+ @Test
+ fun testSmoke() = runBlockingSimulation {
+ val scheduler = FlowEngineImpl(coroutineContext, clock)
+ val switch = MaxMinFlowMultiplexer(scheduler)
+
+ val sources = List(2) { FlowSink(scheduler, 2000.0) }
+ sources.forEach { it.startConsumer(switch.newOutput()) }
+
+ val provider = switch.newInput()
+ val consumer = FixedFlowSource(2000.0, 1.0)
+
+ try {
+ provider.consume(consumer)
+ yield()
+ } finally {
+ switch.clear()
+ }
+ }
+
+ /**
+ * Test overcommitting of resources via the hypervisor with a single VM.
+ */
+ @Test
+ fun testOvercommittedSingle() = runBlockingSimulation {
+ val scheduler = FlowEngineImpl(coroutineContext, clock)
+
+ val duration = 5 * 60L
+ val workload =
+ TraceFlowSource(
+ sequenceOf(
+ TraceFlowSource.Fragment(duration * 1000, 28.0),
+ TraceFlowSource.Fragment(duration * 1000, 3500.0),
+ TraceFlowSource.Fragment(duration * 1000, 0.0),
+ TraceFlowSource.Fragment(duration * 1000, 183.0)
+ ),
+ )
+
+ val switch = MaxMinFlowMultiplexer(scheduler)
+ val sink = FlowSink(scheduler, 3200.0)
+ val provider = switch.newInput()
+
+ try {
+ sink.startConsumer(switch.newOutput())
+ provider.consume(workload)
+ yield()
+ } finally {
+ switch.clear()
+ }
+
+ assertAll(
+ { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") },
+ { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") },
+ { assertEquals(2816700.0, switch.counters.remaining, "Remaining capacity does not match") },
+ { assertEquals(1200000, clock.millis()) }
+ )
+ }
+
+ /**
+ * Test overcommitting of resources via the hypervisor with two VMs.
+ */
+ @Test
+ fun testOvercommittedDual() = runBlockingSimulation {
+ val scheduler = FlowEngineImpl(coroutineContext, clock)
+
+ val duration = 5 * 60L
+ val workloadA =
+ TraceFlowSource(
+ sequenceOf(
+ TraceFlowSource.Fragment(duration * 1000, 28.0),
+ TraceFlowSource.Fragment(duration * 1000, 3500.0),
+ TraceFlowSource.Fragment(duration * 1000, 0.0),
+ TraceFlowSource.Fragment(duration * 1000, 183.0)
+ ),
+ )
+ val workloadB =
+ TraceFlowSource(
+ sequenceOf(
+ TraceFlowSource.Fragment(duration * 1000, 28.0),
+ TraceFlowSource.Fragment(duration * 1000, 3100.0),
+ TraceFlowSource.Fragment(duration * 1000, 0.0),
+ TraceFlowSource.Fragment(duration * 1000, 73.0)
+ )
+ )
+
+ val switch = MaxMinFlowMultiplexer(scheduler)
+ val sink = FlowSink(scheduler, 3200.0)
+ val providerA = switch.newInput()
+ val providerB = switch.newInput()
+
+ try {
+ sink.startConsumer(switch.newOutput())
+
+ coroutineScope {
+ launch { providerA.consume(workloadA) }
+ providerB.consume(workloadB)
+ }
+
+ yield()
+ } finally {
+ switch.clear()
+ }
+ assertAll(
+ { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") },
+ { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") },
+ { assertEquals(2786400.0, switch.counters.remaining, "Remaining capacity does not match") },
+ { assertEquals(1200000, clock.millis()) }
+ )
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt
new file mode 100644
index 00000000..8396d346
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.source
+
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.FlowSink
+import org.opendc.simulator.flow.consume
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+
+/**
+ * A test suite for the [FixedFlowSource] class.
+ */
+internal class FixedFlowSourceTest {
+ @Test
+ fun testSmoke() = runBlockingSimulation {
+ val scheduler = FlowEngineImpl(coroutineContext, clock)
+ val provider = FlowSink(scheduler, 1.0)
+
+ val consumer = FixedFlowSource(1.0, 1.0)
+
+ provider.consume(consumer)
+ assertEquals(1000, clock.millis())
+ }
+
+ @Test
+ fun testUtilization() = runBlockingSimulation {
+ val scheduler = FlowEngineImpl(coroutineContext, clock)
+ val provider = FlowSink(scheduler, 1.0)
+
+ val consumer = FixedFlowSource(1.0, 0.5)
+
+ provider.consume(consumer)
+ assertEquals(2000, clock.millis())
+ }
+}