summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow')
-rw-r--r--opendc-simulator/opendc-simulator-flow/build.gradle.kts38
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt140
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt143
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt60
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt109
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt45
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt56
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt53
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt95
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt48
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt217
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt61
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt58
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt43
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt19
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt28
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt356
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt46
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt297
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt70
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt127
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt399
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt57
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt52
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt82
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt72
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt152
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt222
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt240
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt157
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt147
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt57
32 files changed, 3746 insertions, 0 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/build.gradle.kts b/opendc-simulator/opendc-simulator-flow/build.gradle.kts
new file mode 100644
index 00000000..5a956fee
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/build.gradle.kts
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+description = "High-performance flow simulator"
+
+plugins {
+ `kotlin-library-conventions`
+ `testing-conventions`
+ `jacoco-conventions`
+ `benchmark-conventions`
+}
+
+dependencies {
+ api(platform(projects.opendcPlatform))
+ api(libs.kotlinx.coroutines)
+ implementation(projects.opendcUtils)
+
+ testImplementation(projects.opendcSimulator.opendcSimulatorCore)
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
new file mode 100644
index 00000000..4834f10f
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt
@@ -0,0 +1,140 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.launch
+import org.opendc.simulator.core.SimulationCoroutineScope
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer
+import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer
+import org.opendc.simulator.flow.source.TraceFlowSource
+import org.openjdk.jmh.annotations.*
+import java.util.concurrent.ThreadLocalRandom
+import java.util.concurrent.TimeUnit
+
+@State(Scope.Thread)
+@Fork(1)
+@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
+@OptIn(ExperimentalCoroutinesApi::class)
+class FlowBenchmarks {
+ private lateinit var scope: SimulationCoroutineScope
+ private lateinit var engine: FlowEngine
+
+ @Setup
+ fun setUp() {
+ scope = SimulationCoroutineScope()
+ engine = FlowEngine(scope.coroutineContext, scope.clock)
+ }
+
+ @State(Scope.Thread)
+ class Workload {
+ lateinit var trace: Sequence<TraceFlowSource.Fragment>
+
+ @Setup
+ fun setUp() {
+ val random = ThreadLocalRandom.current()
+ val entries = List(10000) { TraceFlowSource.Fragment(1000, random.nextDouble(0.0, 4500.0)) }
+ trace = entries.asSequence()
+ }
+ }
+
+ @Benchmark
+ fun benchmarkSink(state: Workload) {
+ return scope.runBlockingSimulation {
+ val provider = FlowSink(engine, 4200.0)
+ return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace))
+ }
+ }
+
+ @Benchmark
+ fun benchmarkForward(state: Workload) {
+ return scope.runBlockingSimulation {
+ val provider = FlowSink(engine, 4200.0)
+ val forwarder = FlowForwarder(engine)
+ provider.startConsumer(forwarder)
+ return@runBlockingSimulation forwarder.consume(TraceFlowSource(state.trace))
+ }
+ }
+
+ @Benchmark
+ fun benchmarkMuxMaxMinSingleSource(state: Workload) {
+ return scope.runBlockingSimulation {
+ val switch = MaxMinFlowMultiplexer(engine)
+
+ switch.addOutput(FlowSink(engine, 3000.0))
+ switch.addOutput(FlowSink(engine, 3000.0))
+
+ val provider = switch.newInput()
+ return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace))
+ }
+ }
+
+ @Benchmark
+ fun benchmarkMuxMaxMinTripleSource(state: Workload) {
+ return scope.runBlockingSimulation {
+ val switch = MaxMinFlowMultiplexer(engine)
+
+ switch.addOutput(FlowSink(engine, 3000.0))
+ switch.addOutput(FlowSink(engine, 3000.0))
+
+ repeat(3) {
+ launch {
+ val provider = switch.newInput()
+ provider.consume(TraceFlowSource(state.trace))
+ }
+ }
+ }
+ }
+
+ @Benchmark
+ fun benchmarkMuxExclusiveSingleSource(state: Workload) {
+ return scope.runBlockingSimulation {
+ val switch = ForwardingFlowMultiplexer(engine)
+
+ switch.addOutput(FlowSink(engine, 3000.0))
+ switch.addOutput(FlowSink(engine, 3000.0))
+
+ val provider = switch.newInput()
+ return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace))
+ }
+ }
+
+ @Benchmark
+ fun benchmarkMuxExclusiveTripleSource(state: Workload) {
+ return scope.runBlockingSimulation {
+ val switch = ForwardingFlowMultiplexer(engine)
+
+ switch.addOutput(FlowSink(engine, 3000.0))
+ switch.addOutput(FlowSink(engine, 3000.0))
+
+ repeat(2) {
+ launch {
+ val provider = switch.newInput()
+ provider.consume(TraceFlowSource(state.trace))
+ }
+ }
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
new file mode 100644
index 00000000..c8092082
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt
@@ -0,0 +1,143 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+import org.opendc.simulator.flow.internal.FlowCountersImpl
+
+/**
+ * Abstract implementation of the [FlowConsumer] which can be re-used by other implementations.
+ */
+public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initialCapacity: Double) : FlowConsumer {
+ /**
+ * A flag to indicate that the flow consumer is active.
+ */
+ public override val isActive: Boolean
+ get() = ctx != null
+
+ /**
+ * The capacity of the consumer.
+ */
+ public override var capacity: Double = initialCapacity
+ set(value) {
+ field = value
+ ctx?.capacity = value
+ }
+
+ /**
+ * The current processing rate of the consumer.
+ */
+ public override val rate: Double
+ get() = ctx?.rate ?: 0.0
+
+ /**
+ * The flow processing rate demand at this instant.
+ */
+ public override val demand: Double
+ get() = ctx?.demand ?: 0.0
+
+ /**
+ * The flow counters to track the flow metrics of the consumer.
+ */
+ public override val counters: FlowCounters
+ get() = _counters
+ private val _counters = FlowCountersImpl()
+
+ /**
+ * The [FlowConsumerContext] that is currently running.
+ */
+ protected var ctx: FlowConsumerContext? = null
+ private set
+
+ /**
+ * Construct the [FlowConsumerLogic] instance for a new source.
+ */
+ protected abstract fun createLogic(): FlowConsumerLogic
+
+ /**
+ * Start the specified [FlowConsumerContext].
+ */
+ protected open fun start(ctx: FlowConsumerContext) {
+ ctx.start()
+ }
+
+ /**
+ * The previous demand for the consumer.
+ */
+ private var previousDemand = 0.0
+
+ /**
+ * Update the counters of the flow consumer.
+ */
+ protected fun updateCounters(ctx: FlowConnection, delta: Long) {
+ val demand = previousDemand
+ previousDemand = ctx.demand
+
+ if (delta <= 0) {
+ return
+ }
+
+ val counters = _counters
+ val deltaS = delta / 1000.0
+ val work = demand * deltaS
+ val actualWork = ctx.rate * deltaS
+ val remainingWork = work - actualWork
+
+ counters.demand += work
+ counters.actual += actualWork
+ counters.overcommit += remainingWork
+ }
+
+ /**
+ * Update the counters of the flow consumer.
+ */
+ protected fun updateCounters(demand: Double, actual: Double, overcommit: Double) {
+ val counters = _counters
+ counters.demand += demand
+ counters.actual += actual
+ counters.overcommit += overcommit
+ }
+
+ final override fun startConsumer(source: FlowSource) {
+ check(ctx == null) { "Consumer is in invalid state" }
+ val ctx = engine.newContext(source, createLogic())
+
+ ctx.capacity = capacity
+ this.ctx = ctx
+
+ start(ctx)
+ }
+
+ final override fun pull() {
+ ctx?.pull()
+ }
+
+ final override fun cancel() {
+ val ctx = ctx
+ if (ctx != null) {
+ this.ctx = null
+ ctx.close()
+ }
+ }
+
+ override fun toString(): String = "AbstractFlowConsumer[capacity=$capacity]"
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt
new file mode 100644
index 00000000..fa833961
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+/**
+ * An active connection between a [FlowSource] and [FlowConsumer].
+ */
+public interface FlowConnection : AutoCloseable {
+ /**
+ * The capacity of the connection.
+ */
+ public val capacity: Double
+
+ /**
+ * The flow rate over the connection.
+ */
+ public val rate: Double
+
+ /**
+ * The flow demand of the source.
+ */
+ public val demand: Double
+
+ /**
+ * Pull the source.
+ */
+ public fun pull()
+
+ /**
+ * Push the given flow [rate] over this connection.
+ *
+ * @param rate The rate of the flow to push.
+ */
+ public fun push(rate: Double)
+
+ /**
+ * Disconnect the consumer from its source.
+ */
+ public override fun close()
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
new file mode 100644
index 00000000..3a6e2e97
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt
@@ -0,0 +1,109 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+import kotlinx.coroutines.suspendCancellableCoroutine
+import kotlin.coroutines.resume
+import kotlin.coroutines.resumeWithException
+
+/**
+ * A consumer of a [FlowSource].
+ */
+public interface FlowConsumer {
+ /**
+ * A flag to indicate that the consumer is currently consuming a [FlowSource].
+ */
+ public val isActive: Boolean
+
+ /**
+ * The flow capacity of this consumer.
+ */
+ public val capacity: Double
+
+ /**
+ * The current flow rate of the consumer.
+ */
+ public val rate: Double
+
+ /**
+ * The current flow demand.
+ */
+ public val demand: Double
+
+ /**
+ * The flow counters to track the flow metrics of the consumer.
+ */
+ public val counters: FlowCounters
+
+ /**
+ * Start consuming the specified [source].
+ *
+ * @throws IllegalStateException if the consumer is already active.
+ */
+ public fun startConsumer(source: FlowSource)
+
+ /**
+ * Ask the consumer to pull its source.
+ *
+ * If the consumer is not active, this operation will be a no-op.
+ */
+ public fun pull()
+
+ /**
+ * Disconnect the consumer from its source.
+ *
+ * If the consumer is not active, this operation will be a no-op.
+ */
+ public fun cancel()
+}
+
+/**
+ * Consume the specified [source] and suspend execution until the source is fully consumed or failed.
+ */
+public suspend fun FlowConsumer.consume(source: FlowSource) {
+ return suspendCancellableCoroutine { cont ->
+ startConsumer(object : FlowSource by source {
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
+ source.onEvent(conn, now, event)
+
+ if (event == FlowEvent.Exit && !cont.isCompleted) {
+ cont.resume(Unit)
+ }
+ }
+
+ override fun onFailure(conn: FlowConnection, cause: Throwable) {
+ try {
+ source.onFailure(conn, cause)
+ cont.resumeWithException(cause)
+ } catch (e: Throwable) {
+ e.addSuppressed(cause)
+ cont.resumeWithException(e)
+ }
+ }
+
+ override fun toString(): String = "SuspendingFlowSource"
+ })
+
+ cont.invokeOnCancellation { cancel() }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
new file mode 100644
index 00000000..75b2d25b
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+/**
+ * A controllable [FlowConnection].
+ *
+ * This interface is used by [FlowConsumer]s to control the connection between it and the source.
+ */
+public interface FlowConsumerContext : FlowConnection {
+ /**
+ * The capacity of the connection.
+ */
+ public override var capacity: Double
+
+ /**
+ * Start the flow over the connection.
+ */
+ public fun start()
+
+ /**
+ * Synchronously flush the changes of the connection.
+ */
+ public fun flush()
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
new file mode 100644
index 00000000..c69cb17e
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+/**
+ * A collection of callbacks associated with a [FlowConsumer].
+ */
+public interface FlowConsumerLogic {
+ /**
+ * This method is invoked when a [FlowSource] changes the rate of flow to this consumer.
+ *
+ * @param ctx The context in which the provider runs.
+ * @param now The virtual timestamp in milliseconds at which the update is occurring.
+ * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds.
+ * @param rate The requested processing rate of the source.
+ */
+ public fun onPush(ctx: FlowConsumerContext, now: Long, delta: Long, rate: Double) {}
+
+ /**
+ * This method is invoked when the flow graph has converged into a steady-state system.
+ *
+ * @param ctx The context in which the provider runs.
+ * @param now The virtual timestamp in milliseconds at which the system converged.
+ * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds.
+ */
+ public fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {}
+
+ /**
+ * This method is invoked when the [FlowSource] is completed.
+ *
+ * @param ctx The context in which the provider runs.
+ * @param now The virtual timestamp in milliseconds at which the provider finished.
+ * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds.
+ */
+ public fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) {}
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
new file mode 100644
index 00000000..e15d7643
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+/**
+ * An interface that tracks cumulative counts of the flow accumulation over a stage.
+ */
+public interface FlowCounters {
+ /**
+ * The accumulated flow that a source wanted to push over the connection.
+ */
+ public val demand: Double
+
+ /**
+ * The accumulated flow that was actually transferred over the connection.
+ */
+ public val actual: Double
+
+ /**
+ * The accumulated flow that could not be transferred over the connection.
+ */
+ public val overcommit: Double
+
+ /**
+ * The accumulated flow lost due to interference between sources.
+ */
+ public val interference: Double
+
+ /**
+ * Reset the flow counters.
+ */
+ public fun reset()
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt
new file mode 100644
index 00000000..65224827
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt
@@ -0,0 +1,95 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+import java.time.Clock
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * A [FlowEngine] is responsible for managing the interaction between [FlowSource]s and [FlowConsumer]s.
+ *
+ * The engine centralizes the scheduling logic of state updates of flow connections, allowing update propagation
+ * to happen more efficiently. and overall, reducing the work necessary to transition into a steady state.
+ */
+public interface FlowEngine {
+ /**
+ * The virtual [Clock] associated with this engine.
+ */
+ public val clock: Clock
+
+ /**
+ * Create a new [FlowConsumerContext] with the given [provider].
+ *
+ * @param consumer The consumer logic.
+ * @param provider The logic of the resource provider.
+ */
+ public fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext
+
+ /**
+ * Start batching the execution of resource updates until [popBatch] is called.
+ *
+ * This method is useful if you want to propagate multiple resources updates (e.g., starting multiple CPUs
+ * simultaneously) in a single state update.
+ *
+ * Multiple calls to this method requires the same number of [popBatch] calls in order to properly flush the
+ * resource updates. This allows nested calls to [pushBatch], but might cause issues if [popBatch] is not called
+ * the same amount of times. To simplify batching, see [batch].
+ */
+ public fun pushBatch()
+
+ /**
+ * Stop the batching of resource updates and run the interpreter on the batch.
+ *
+ * Note that method will only flush the event once the first call to [pushBatch] has received a [popBatch] call.
+ */
+ public fun popBatch()
+
+ public companion object {
+ /**
+ * Construct a new [FlowEngine] implementation.
+ *
+ * @param context The coroutine context to use.
+ * @param clock The virtual simulation clock.
+ */
+ @JvmStatic
+ @JvmName("create")
+ public operator fun invoke(context: CoroutineContext, clock: Clock): FlowEngine {
+ return FlowEngineImpl(context, clock)
+ }
+ }
+}
+
+/**
+ * Batch the execution of several interrupts into a single call.
+ *
+ * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update.
+ */
+public inline fun FlowEngine.batch(block: () -> Unit) {
+ try {
+ pushBatch()
+ block()
+ } finally {
+ popBatch()
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt
new file mode 100644
index 00000000..14c85183
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+/**
+ * A flow event that is communicated to a [FlowSource].
+ */
+public enum class FlowEvent {
+ /**
+ * This event is emitted to the source when it has started.
+ */
+ Start,
+
+ /**
+ * This event is emitted to the source when it is stopped.
+ */
+ Exit,
+
+ /**
+ * This event is emitted to the source when the system has converged into a steady state.
+ */
+ Converge,
+
+ /**
+ * This event is emitted to the source when the capacity of the consumer has changed.
+ */
+ Capacity,
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
new file mode 100644
index 00000000..2074033e
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt
@@ -0,0 +1,217 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+import org.opendc.simulator.flow.internal.FlowCountersImpl
+
+/**
+ * A class that acts as a [FlowSource] and [FlowConsumer] at the same time.
+ *
+ * @param engine The [FlowEngine] the forwarder runs in.
+ * @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits.
+ */
+public class FlowForwarder(private val engine: FlowEngine, private val isCoupled: Boolean = false) : FlowSource, FlowConsumer, AutoCloseable {
+ /**
+ * The delegate [FlowSource].
+ */
+ private var delegate: FlowSource? = null
+
+ /**
+ * A flag to indicate that the delegate was started.
+ */
+ private var hasDelegateStarted: Boolean = false
+
+ /**
+ * The exposed [FlowConnection].
+ */
+ private val _ctx = object : FlowConnection {
+ override val capacity: Double
+ get() = _innerCtx?.capacity ?: 0.0
+
+ override val demand: Double
+ get() = _innerCtx?.demand ?: 0.0
+
+ override val rate: Double
+ get() = _innerCtx?.rate ?: 0.0
+
+ override fun pull() {
+ _innerCtx?.pull()
+ }
+
+ override fun push(rate: Double) {
+ _innerCtx?.push(rate)
+ _demand = rate
+ }
+
+ override fun close() {
+ val delegate = checkNotNull(delegate) { "Delegate not active" }
+
+ if (isCoupled)
+ _innerCtx?.close()
+ else
+ _innerCtx?.push(0.0)
+
+ // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we
+ // reset beforehand the existing state and check whether it has been updated afterwards
+ reset()
+
+ delegate.onEvent(this, engine.clock.millis(), FlowEvent.Exit)
+ }
+ }
+
+ /**
+ * The [FlowConnection] in which the forwarder runs.
+ */
+ private var _innerCtx: FlowConnection? = null
+
+ override val isActive: Boolean
+ get() = delegate != null
+
+ override val capacity: Double
+ get() = _ctx.capacity
+
+ override val rate: Double
+ get() = _ctx.rate
+
+ override val demand: Double
+ get() = _ctx.demand
+
+ override val counters: FlowCounters
+ get() = _counters
+ private val _counters = FlowCountersImpl()
+
+ override fun startConsumer(source: FlowSource) {
+ check(delegate == null) { "Forwarder already active" }
+
+ delegate = source
+
+ // Pull to replace the source
+ pull()
+ }
+
+ override fun pull() {
+ _ctx.pull()
+ }
+
+ override fun cancel() {
+ val delegate = delegate
+ val ctx = _innerCtx
+
+ if (delegate != null) {
+ this.delegate = null
+
+ if (ctx != null) {
+ delegate.onEvent(this._ctx, engine.clock.millis(), FlowEvent.Exit)
+ }
+ }
+ }
+
+ override fun close() {
+ val ctx = _innerCtx
+
+ if (ctx != null) {
+ this._innerCtx = null
+ ctx.pull()
+ }
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ val delegate = delegate
+
+ if (!hasDelegateStarted) {
+ start()
+ }
+
+ updateCounters(conn, delta)
+
+ return delegate?.onPull(this._ctx, now, delta) ?: Long.MAX_VALUE
+ }
+
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
+ when (event) {
+ FlowEvent.Start -> {
+ _innerCtx = conn
+ }
+ FlowEvent.Exit -> {
+ _innerCtx = null
+
+ val delegate = delegate
+ if (delegate != null) {
+ reset()
+ delegate.onEvent(this._ctx, now, FlowEvent.Exit)
+ }
+ }
+ else -> delegate?.onEvent(this._ctx, now, event)
+ }
+ }
+
+ override fun onFailure(conn: FlowConnection, cause: Throwable) {
+ _innerCtx = null
+
+ val delegate = delegate
+ if (delegate != null) {
+ reset()
+ delegate.onFailure(this._ctx, cause)
+ }
+ }
+
+ /**
+ * Start the delegate.
+ */
+ private fun start() {
+ val delegate = delegate ?: return
+ delegate.onEvent(checkNotNull(_innerCtx), engine.clock.millis(), FlowEvent.Start)
+
+ hasDelegateStarted = true
+ }
+
+ /**
+ * Reset the delegate.
+ */
+ private fun reset() {
+ delegate = null
+ hasDelegateStarted = false
+ }
+
+ /**
+ * The requested flow rate.
+ */
+ private var _demand: Double = 0.0
+
+ /**
+ * Update the flow counters for the transformer.
+ */
+ private fun updateCounters(ctx: FlowConnection, delta: Long) {
+ if (delta <= 0) {
+ return
+ }
+
+ val counters = _counters
+ val deltaS = delta / 1000.0
+ val work = _demand * deltaS
+ val actualWork = ctx.rate * deltaS
+ counters.demand += work
+ counters.actual += actualWork
+ counters.overcommit += (work - actualWork)
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
new file mode 100644
index 00000000..fb6ca85d
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+/**
+ * A [FlowSink] represents a sink with a fixed capacity.
+ *
+ * @param initialCapacity The initial capacity of the resource.
+ * @param engine The engine that is used for driving the flow simulation.
+ * @param parent The parent flow system.
+ */
+public class FlowSink(
+ private val engine: FlowEngine,
+ initialCapacity: Double,
+ private val parent: FlowSystem? = null
+) : AbstractFlowConsumer(engine, initialCapacity) {
+
+ override fun createLogic(): FlowConsumerLogic {
+ return object : FlowConsumerLogic {
+ override fun onPush(
+ ctx: FlowConsumerContext,
+ now: Long,
+ delta: Long,
+ rate: Double
+ ) {
+ updateCounters(ctx, delta)
+ }
+
+ override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) {
+ updateCounters(ctx, delta)
+ cancel()
+ }
+
+ override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
+ parent?.onConverge(now)
+ }
+ }
+ }
+
+ override fun toString(): String = "FlowSink[capacity=$capacity]"
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
new file mode 100644
index 00000000..077b4d38
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+/**
+ * A source of flow that is consumed by a [FlowConsumer].
+ *
+ * Implementations of this interface should be considered stateful and must be assumed not to be re-usable
+ * (concurrently) for multiple [FlowConsumer]s, unless explicitly said otherwise.
+ */
+public interface FlowSource {
+ /**
+ * This method is invoked when the source is pulled.
+ *
+ * @param conn The connection between the source and consumer.
+ * @param now The virtual timestamp in milliseconds at which the pull is occurring.
+ * @param delta The virtual duration between this call and the last call to [onPull] in milliseconds.
+ * @return The duration after which the resource consumer should be pulled again.
+ */
+ public fun onPull(conn: FlowConnection, now: Long, delta: Long): Long
+
+ /**
+ * This method is invoked when an event has occurred.
+ *
+ * @param conn The connection between the source and consumer.
+ * @param now The virtual timestamp in milliseconds at which the event is occurring.
+ * @param event The event that has occurred.
+ */
+ public fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {}
+
+ /**
+ * This method is invoked when the source throws an exception.
+ *
+ * @param conn The connection between the source and consumer.
+ * @param cause The cause of the failure.
+ */
+ public fun onFailure(conn: FlowConnection, cause: Throwable) {}
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt
new file mode 100644
index 00000000..db6aa69f
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+/**
+ * A system of possible multiple sub-resources.
+ *
+ * This interface is used to model hierarchies of resource providers, which can listen efficiently to changes of the
+ * resource provider.
+ */
+public interface FlowSystem {
+ /**
+ * The parent system to which this system belongs or `null` if it has no parent.
+ */
+ public val parent: FlowSystem?
+
+ /**
+ * This method is invoked when the system has converged to a steady-state.
+ *
+ * @param timestamp The timestamp at which the system converged.
+ */
+ public fun onConverge(timestamp: Long)
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt
new file mode 100644
index 00000000..aa2713b6
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt
@@ -0,0 +1,19 @@
+package org.opendc.simulator.flow.interference
+
+import org.opendc.simulator.flow.FlowSource
+
+/**
+ * An interference domain represents a system of flow stages where [flow sources][FlowSource] may incur
+ * performance variability due to operating on the same resource and therefore causing interference.
+ */
+public interface InterferenceDomain {
+ /**
+ * Compute the performance score of a participant in this interference domain.
+ *
+ * @param key The participant to obtain the score of or `null` if the participant has no key.
+ * @param load The overall load on the interference domain.
+ * @return A score representing the performance score to be applied to the resource consumer, with 1
+ * meaning no influence, <1 means that performance degrades, and >1 means that performance improves.
+ */
+ public fun apply(key: InterferenceKey?, load: Double): Double
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt
new file mode 100644
index 00000000..d28ebde5
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.interference
+
+/**
+ * A key that uniquely identifies a participant of an interference domain.
+ */
+public interface InterferenceKey
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
new file mode 100644
index 00000000..9f3afc4d
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
@@ -0,0 +1,356 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.internal
+
+import org.opendc.simulator.flow.*
+import java.util.ArrayDeque
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * Implementation of a [FlowConnection] managing the communication between flow sources and consumers.
+ */
+internal class FlowConsumerContextImpl(
+ private val engine: FlowEngineImpl,
+ private val source: FlowSource,
+ private val logic: FlowConsumerLogic
+) : FlowConsumerContext {
+ /**
+ * The clock to track simulation time.
+ */
+ private val _clock = engine.clock
+
+ /**
+ * The capacity of the resource.
+ */
+ override var capacity: Double = 0.0
+ set(value) {
+ val oldValue = field
+
+ // Only changes will be propagated
+ if (value != oldValue) {
+ field = value
+ onCapacityChange()
+ }
+ }
+
+ /**
+ * A flag to indicate the state of the context.
+ */
+ private var _state = State.Pending
+
+ /**
+ * The current processing speed of the resource.
+ */
+ override val rate: Double
+ get() = _rate
+ private var _rate = 0.0
+
+ /**
+ * The current resource processing demand.
+ */
+ override val demand: Double
+ get() = _limit
+
+ /**
+ * The current state of the resource context.
+ */
+ private var _limit: Double = 0.0
+ private var _activeLimit: Double = 0.0
+ private var _deadline: Long = Long.MIN_VALUE
+
+ /**
+ * A flag to indicate that an update is active.
+ */
+ private var _updateActive = false
+
+ /**
+ * The update flag indicating why the update was triggered.
+ */
+ private var _flag: Int = 0
+
+ /**
+ * The timestamp of calls to the callbacks.
+ */
+ private var _lastUpdate: Long = Long.MIN_VALUE
+ private var _lastConvergence: Long = Long.MAX_VALUE
+
+ /**
+ * The timers at which the context is scheduled to be interrupted.
+ */
+ private val _timers: ArrayDeque<FlowEngineImpl.Timer> = ArrayDeque()
+
+ override fun start() {
+ check(_state == State.Pending) { "Consumer is already started" }
+ engine.batch {
+ source.onEvent(this, _clock.millis(), FlowEvent.Start)
+ _state = State.Active
+ pull()
+ }
+ }
+
+ override fun close() {
+ if (_state == State.Stopped) {
+ return
+ }
+
+ engine.batch {
+ _state = State.Stopped
+ if (!_updateActive) {
+ val now = _clock.millis()
+ val delta = max(0, now - _lastUpdate)
+ doStop(now, delta)
+
+ // FIX: Make sure the context converges
+ _flag = _flag or FLAG_INVALIDATE
+ scheduleUpdate(_clock.millis())
+ }
+ }
+ }
+
+ override fun pull() {
+ if (_state == State.Stopped) {
+ return
+ }
+
+ _flag = _flag or FLAG_INTERRUPT
+ scheduleUpdate(_clock.millis())
+ }
+
+ override fun flush() {
+ if (_state == State.Stopped) {
+ return
+ }
+
+ engine.scheduleSync(_clock.millis(), this)
+ }
+
+ override fun push(rate: Double) {
+ if (_limit == rate) {
+ return
+ }
+
+ _limit = rate
+
+ // Invalidate only if the active limit is change and no update is active
+ // If an update is active, it will already get picked up at the end of the update
+ if (_activeLimit != rate && !_updateActive) {
+ _flag = _flag or FLAG_INVALIDATE
+ scheduleUpdate(_clock.millis())
+ }
+ }
+
+ /**
+ * Determine whether the state of the resource context should be updated.
+ */
+ fun shouldUpdate(timestamp: Long): Boolean {
+ // Either the resource context is flagged or there is a pending update at this timestamp
+ return _flag != 0 || _limit != _activeLimit || _deadline == timestamp
+ }
+
+ /**
+ * Update the state of the resource context.
+ */
+ fun doUpdate(now: Long) {
+ val oldState = _state
+ if (oldState != State.Active) {
+ return
+ }
+
+ val lastUpdate = _lastUpdate
+
+ _lastUpdate = now
+ _updateActive = true
+
+ val delta = max(0, now - lastUpdate)
+
+ try {
+ val duration = source.onPull(this, now, delta)
+ val newDeadline = if (duration != Long.MAX_VALUE) now + duration else duration
+
+ // Reset update flags
+ _flag = 0
+
+ // Check whether the state has changed after [consumer.onNext]
+ when (_state) {
+ State.Active -> {
+ logic.onPush(this, now, delta, _limit)
+
+ // Schedule an update at the new deadline
+ scheduleUpdate(now, newDeadline)
+ }
+ State.Stopped -> doStop(now, delta)
+ State.Pending -> throw IllegalStateException("Illegal transition to pending state")
+ }
+
+ // Note: pending limit might be changed by [logic.onConsume], so re-fetch the value
+ val newLimit = _limit
+
+ // Flush the changes to the flow
+ _activeLimit = newLimit
+ _deadline = newDeadline
+ _rate = min(capacity, newLimit)
+ } catch (cause: Throwable) {
+ doFail(now, delta, cause)
+ } finally {
+ _updateActive = false
+ }
+ }
+
+ /**
+ * Prune the elapsed timers from this context.
+ */
+ fun pruneTimers(now: Long) {
+ val timers = _timers
+ while (true) {
+ val head = timers.peek()
+ if (head == null || head.target > now) {
+ break
+ }
+ timers.poll()
+ }
+ }
+
+ /**
+ * Try to re-schedule the resource context in case it was skipped.
+ */
+ fun tryReschedule(now: Long) {
+ val deadline = _deadline
+ if (deadline > now && deadline != Long.MAX_VALUE) {
+ scheduleUpdate(now, deadline)
+ }
+ }
+
+ /**
+ * This method is invoked when the system converges into a steady state.
+ */
+ fun onConverge(timestamp: Long) {
+ val delta = max(0, timestamp - _lastConvergence)
+ _lastConvergence = timestamp
+
+ try {
+ if (_state == State.Active) {
+ source.onEvent(this, timestamp, FlowEvent.Converge)
+ }
+
+ logic.onConverge(this, timestamp, delta)
+ } catch (cause: Throwable) {
+ doFail(timestamp, max(0, timestamp - _lastUpdate), cause)
+ }
+ }
+
+ override fun toString(): String = "FlowConsumerContextImpl[capacity=$capacity,rate=$_rate]"
+
+ /**
+ * Stop the resource context.
+ */
+ private fun doStop(now: Long, delta: Long) {
+ try {
+ source.onEvent(this, now, FlowEvent.Exit)
+ logic.onFinish(this, now, delta)
+ } catch (cause: Throwable) {
+ doFail(now, delta, cause)
+ } finally {
+ _deadline = Long.MAX_VALUE
+ _limit = 0.0
+ }
+ }
+
+ /**
+ * Fail the resource consumer.
+ */
+ private fun doFail(now: Long, delta: Long, cause: Throwable) {
+ try {
+ source.onFailure(this, cause)
+ } catch (e: Throwable) {
+ e.addSuppressed(cause)
+ e.printStackTrace()
+ }
+
+ logic.onFinish(this, now, delta)
+ }
+
+ /**
+ * Indicate that the capacity of the resource has changed.
+ */
+ private fun onCapacityChange() {
+ // Do not inform the consumer if it has not been started yet
+ if (_state != State.Active) {
+ return
+ }
+
+ engine.batch {
+ // Inform the consumer of the capacity change. This might already trigger an interrupt.
+ source.onEvent(this, _clock.millis(), FlowEvent.Capacity)
+
+ pull()
+ }
+ }
+
+ /**
+ * Schedule an update for this resource context.
+ */
+ private fun scheduleUpdate(now: Long) {
+ engine.scheduleImmediate(now, this)
+ }
+
+ /**
+ * Schedule a delayed update for this resource context.
+ */
+ private fun scheduleUpdate(now: Long, target: Long) {
+ val timers = _timers
+ if (target != Long.MAX_VALUE && (timers.isEmpty() || target < timers.peek().target)) {
+ timers.addFirst(engine.scheduleDelayed(now, this, target))
+ }
+ }
+
+ /**
+ * The state of a resource context.
+ */
+ private enum class State {
+ /**
+ * The resource context is pending and the resource is waiting to be consumed.
+ */
+ Pending,
+
+ /**
+ * The resource context is active and the resource is currently being consumed.
+ */
+ Active,
+
+ /**
+ * The resource context is stopped and the resource cannot be consumed anymore.
+ */
+ Stopped
+ }
+
+ /**
+ * A flag to indicate that the context should be invalidated.
+ */
+ private val FLAG_INVALIDATE = 0b01
+
+ /**
+ * A flag to indicate that the context should be interrupted.
+ */
+ private val FLAG_INTERRUPT = 0b10
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt
new file mode 100644
index 00000000..141d335d
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.internal
+
+import org.opendc.simulator.flow.FlowCounters
+
+/**
+ * Mutable implementation of the [FlowCounters] interface.
+ */
+internal class FlowCountersImpl : FlowCounters {
+ override var demand: Double = 0.0
+ override var actual: Double = 0.0
+ override var overcommit: Double = 0.0
+ override var interference: Double = 0.0
+
+ override fun reset() {
+ demand = 0.0
+ actual = 0.0
+ overcommit = 0.0
+ interference = 0.0
+ }
+
+ override fun toString(): String {
+ return "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit,interference=$interference]"
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
new file mode 100644
index 00000000..1a50da2c
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
@@ -0,0 +1,297 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.internal
+
+import kotlinx.coroutines.Delay
+import kotlinx.coroutines.DisposableHandle
+import kotlinx.coroutines.InternalCoroutinesApi
+import kotlinx.coroutines.Runnable
+import org.opendc.simulator.flow.*
+import java.time.Clock
+import java.util.*
+import kotlin.coroutines.ContinuationInterceptor
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * Internal implementation of the [FlowEngine] interface.
+ *
+ * @param context The coroutine context to use.
+ * @param clock The virtual simulation clock.
+ */
+internal class FlowEngineImpl(private val context: CoroutineContext, override val clock: Clock) : FlowEngine {
+ /**
+ * The [Delay] instance that provides scheduled execution of [Runnable]s.
+ */
+ @OptIn(InternalCoroutinesApi::class)
+ private val delay = requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" }
+
+ /**
+ * The queue of connection updates that are scheduled for immediate execution.
+ */
+ private val queue = ArrayDeque<FlowConsumerContextImpl>()
+
+ /**
+ * A priority queue containing the connection updates to be scheduled in the future.
+ */
+ private val futureQueue = PriorityQueue<Timer>()
+
+ /**
+ * The stack of engine invocations to occur in the future.
+ */
+ private val futureInvocations = ArrayDeque<Invocation>()
+
+ /**
+ * The systems that have been visited during the engine cycle.
+ */
+ private val visited = linkedSetOf<FlowConsumerContextImpl>()
+
+ /**
+ * The index in the batch stack.
+ */
+ private var batchIndex = 0
+
+ /**
+ * A flag to indicate that the engine is currently active.
+ */
+ private val isRunning: Boolean
+ get() = batchIndex > 0
+
+ /**
+ * Update the specified [ctx] synchronously.
+ */
+ fun scheduleSync(now: Long, ctx: FlowConsumerContextImpl) {
+ ctx.doUpdate(now)
+ visited.add(ctx)
+
+ // In-case the engine is already running in the call-stack, return immediately. The changes will be picked
+ // up by the active engine.
+ if (isRunning) {
+ return
+ }
+
+ try {
+ batchIndex++
+ runEngine(now)
+ } finally {
+ batchIndex--
+ }
+ }
+
+ /**
+ * Enqueue the specified [ctx] to be updated immediately during the active engine cycle.
+ *
+ * This method should be used when the state of a flow context is invalidated/interrupted and needs to be
+ * re-computed. In case no engine is currently active, the engine will be started.
+ */
+ fun scheduleImmediate(now: Long, ctx: FlowConsumerContextImpl) {
+ queue.add(ctx)
+
+ // In-case the engine is already running in the call-stack, return immediately. The changes will be picked
+ // up by the active engine.
+ if (isRunning) {
+ return
+ }
+
+ try {
+ batchIndex++
+ runEngine(now)
+ } finally {
+ batchIndex--
+ }
+ }
+
+ /**
+ * Schedule the engine to run at [target] to update the flow contexts.
+ *
+ * This method will override earlier calls to this method for the same [ctx].
+ *
+ * @param now The current virtual timestamp.
+ * @param ctx The flow context to which the event applies.
+ * @param target The timestamp when the interrupt should happen.
+ */
+ fun scheduleDelayed(now: Long, ctx: FlowConsumerContextImpl, target: Long): Timer {
+ val futureQueue = futureQueue
+
+ require(target >= now) { "Timestamp must be in the future" }
+
+ val timer = Timer(ctx, target)
+ futureQueue.add(timer)
+
+ return timer
+ }
+
+ override fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext = FlowConsumerContextImpl(this, consumer, provider)
+
+ override fun pushBatch() {
+ batchIndex++
+ }
+
+ override fun popBatch() {
+ try {
+ // Flush the work if the platform is not already running
+ if (batchIndex == 1 && queue.isNotEmpty()) {
+ runEngine(clock.millis())
+ }
+ } finally {
+ batchIndex--
+ }
+ }
+
+ /**
+ * Run all the enqueued actions for the specified [timestamp][now].
+ */
+ private fun runEngine(now: Long) {
+ val queue = queue
+ val futureQueue = futureQueue
+ val futureInvocations = futureInvocations
+ val visited = visited
+
+ // Remove any entries in the `futureInvocations` queue from the past
+ while (true) {
+ val head = futureInvocations.peek()
+ if (head == null || head.timestamp > now) {
+ break
+ }
+ futureInvocations.poll()
+ }
+
+ // Execute all scheduled updates at current timestamp
+ while (true) {
+ val timer = futureQueue.peek() ?: break
+ val ctx = timer.ctx
+ val target = timer.target
+
+ assert(target >= now) { "Internal inconsistency: found update of the past" }
+
+ if (target > now) {
+ break
+ }
+
+ futureQueue.poll()
+
+ ctx.pruneTimers(now)
+
+ if (ctx.shouldUpdate(now)) {
+ ctx.doUpdate(now)
+ visited.add(ctx)
+ } else {
+ ctx.tryReschedule(now)
+ }
+ }
+
+ // Repeat execution of all immediate updates until the system has converged to a steady-state
+ // We have to take into account that the onConverge callback can also trigger new actions.
+ do {
+ // Execute all immediate updates
+ while (true) {
+ val ctx = queue.poll() ?: break
+
+ if (ctx.shouldUpdate(now)) {
+ ctx.doUpdate(now)
+ visited.add(ctx)
+ }
+ }
+
+ for (system in visited) {
+ system.onConverge(now)
+ }
+
+ visited.clear()
+ } while (queue.isNotEmpty())
+
+ // Schedule an engine invocation for the next update to occur.
+ val headTimer = futureQueue.peek()
+ if (headTimer != null) {
+ trySchedule(now, futureInvocations, headTimer.target)
+ }
+ }
+
+ /**
+ * Try to schedule an engine invocation at the specified [target].
+ *
+ * @param now The current virtual timestamp.
+ * @param target The virtual timestamp at which the engine invocation should happen.
+ * @param scheduled The queue of scheduled invocations.
+ */
+ private fun trySchedule(now: Long, scheduled: ArrayDeque<Invocation>, target: Long) {
+ while (true) {
+ val invocation = scheduled.peekFirst()
+ if (invocation == null || invocation.timestamp > target) {
+ // Case 2: A new timer was registered ahead of the other timers.
+ // Solution: Schedule a new scheduler invocation
+ @OptIn(InternalCoroutinesApi::class)
+ val handle = delay.invokeOnTimeout(
+ target - now,
+ {
+ try {
+ batchIndex++
+ runEngine(target)
+ } finally {
+ batchIndex--
+ }
+ },
+ context
+ )
+ scheduled.addFirst(Invocation(target, handle))
+ break
+ } else if (invocation.timestamp < target) {
+ // Case 2: A timer was cancelled and the head of the timer queue is now later than excepted
+ // Solution: Cancel the next scheduler invocation
+ scheduled.pollFirst()
+
+ invocation.cancel()
+ } else {
+ break
+ }
+ }
+ }
+
+ /**
+ * A future engine invocation.
+ *
+ * This class is used to keep track of the future engine invocations created using the [Delay] instance. In case
+ * the invocation is not needed anymore, it can be cancelled via [cancel].
+ */
+ private data class Invocation(
+ @JvmField val timestamp: Long,
+ @JvmField val handle: DisposableHandle
+ ) {
+ /**
+ * Cancel the engine invocation.
+ */
+ fun cancel() = handle.dispose()
+ }
+
+ /**
+ * An update call for [ctx] that is scheduled for [target].
+ *
+ * This class represents an update in the future at [target] requested by [ctx].
+ */
+ class Timer(@JvmField val ctx: FlowConsumerContextImpl, @JvmField val target: Long) : Comparable<Timer> {
+ override fun compareTo(other: Timer): Int {
+ return target.compareTo(other.target)
+ }
+
+ override fun toString(): String = "Timer[ctx=$ctx,timestamp=$target]"
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
new file mode 100644
index 00000000..17b82391
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.mux
+
+import org.opendc.simulator.flow.FlowConsumer
+import org.opendc.simulator.flow.FlowCounters
+import org.opendc.simulator.flow.FlowSource
+import org.opendc.simulator.flow.interference.InterferenceKey
+
+/**
+ * A [FlowMultiplexer] enables multiplexing multiple [FlowSource]s over possibly multiple [FlowConsumer]s.
+ */
+public interface FlowMultiplexer {
+ /**
+ * The inputs of the multiplexer that can be used to consume sources.
+ */
+ public val inputs: Set<FlowConsumer>
+
+ /**
+ * The outputs of the multiplexer over which the flows will be distributed.
+ */
+ public val outputs: Set<FlowConsumer>
+
+ /**
+ * The flow counters to track the flow metrics of all multiplexer inputs.
+ */
+ public val counters: FlowCounters
+
+ /**
+ * Create a new input on this multiplexer.
+ *
+ * @param key The key of the interference member to which the input belongs.
+ */
+ public fun newInput(key: InterferenceKey? = null): FlowConsumer
+
+ /**
+ * Remove [input] from this multiplexer.
+ */
+ public fun removeInput(input: FlowConsumer)
+
+ /**
+ * Add the specified [output] to the multiplexer.
+ */
+ public fun addOutput(output: FlowConsumer)
+
+ /**
+ * Clear all inputs and outputs from the switch.
+ */
+ public fun clear()
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
new file mode 100644
index 00000000..811d9460
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
@@ -0,0 +1,127 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.mux
+
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.interference.InterferenceKey
+import java.util.ArrayDeque
+
+/**
+ * A [FlowMultiplexer] implementation that allocates inputs to the outputs of the multiplexer exclusively. This means
+ * that a single input is directly connected to an output and that the multiplexer can only support as many
+ * inputs as outputs.
+ *
+ * @param engine The [FlowEngine] driving the simulation.
+ */
+public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMultiplexer {
+ override val inputs: Set<FlowConsumer>
+ get() = _inputs
+ private val _inputs = mutableSetOf<Input>()
+
+ override val outputs: Set<FlowConsumer>
+ get() = _outputs
+ private val _outputs = mutableSetOf<FlowConsumer>()
+ private val _availableOutputs = ArrayDeque<FlowForwarder>()
+
+ override val counters: FlowCounters = object : FlowCounters {
+ override val demand: Double
+ get() = _outputs.sumOf { it.counters.demand }
+ override val actual: Double
+ get() = _outputs.sumOf { it.counters.actual }
+ override val overcommit: Double
+ get() = _outputs.sumOf { it.counters.overcommit }
+ override val interference: Double
+ get() = _outputs.sumOf { it.counters.interference }
+
+ override fun reset() {
+ for (input in _outputs) {
+ input.counters.reset()
+ }
+ }
+
+ override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit]"
+ }
+
+ override fun newInput(key: InterferenceKey?): FlowConsumer {
+ val forwarder = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" }
+ val output = Input(forwarder)
+ _inputs += output
+ return output
+ }
+
+ override fun removeInput(input: FlowConsumer) {
+ if (!_inputs.remove(input)) {
+ return
+ }
+
+ (input as Input).close()
+ }
+
+ override fun addOutput(output: FlowConsumer) {
+ if (output in outputs) {
+ return
+ }
+
+ val forwarder = FlowForwarder(engine)
+
+ _outputs += output
+ _availableOutputs += forwarder
+
+ output.startConsumer(object : FlowSource by forwarder {
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
+ if (event == FlowEvent.Exit) {
+ // De-register the output after it has finished
+ _outputs -= output
+ }
+
+ forwarder.onEvent(conn, now, event)
+ }
+ })
+ }
+
+ override fun clear() {
+ for (input in _outputs) {
+ input.cancel()
+ }
+ _outputs.clear()
+
+ // Inputs are implicitly cancelled by the output forwarders
+ _inputs.clear()
+ }
+
+ /**
+ * An input on the multiplexer.
+ */
+ private inner class Input(private val forwarder: FlowForwarder) : FlowConsumer by forwarder {
+ /**
+ * Close the input.
+ */
+ fun close() {
+ // We explicitly do not close the forwarder here in order to re-use it across input resources.
+ _inputs -= this
+ _availableOutputs += forwarder
+ }
+
+ override fun toString(): String = "ForwardingFlowMultiplexer.Input"
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
new file mode 100644
index 00000000..9735f121
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -0,0 +1,399 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.mux
+
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.interference.InterferenceDomain
+import org.opendc.simulator.flow.interference.InterferenceKey
+import org.opendc.simulator.flow.internal.FlowCountersImpl
+import kotlin.math.max
+import kotlin.math.min
+
+/**
+ * A [FlowMultiplexer] implementation that multiplexes flows over the available outputs using max-min fair sharing.
+ *
+ * @param engine The [FlowEngine] to drive the flow simulation.
+ * @param parent The parent flow system of the multiplexer.
+ * @param interferenceDomain The interference domain of the multiplexer.
+ */
+public class MaxMinFlowMultiplexer(
+ private val engine: FlowEngine,
+ private val parent: FlowSystem? = null,
+ private val interferenceDomain: InterferenceDomain? = null
+) : FlowMultiplexer {
+ /**
+ * The inputs of the multiplexer.
+ */
+ override val inputs: Set<FlowConsumer>
+ get() = _inputs
+ private val _inputs = mutableSetOf<Input>()
+ private val _activeInputs = mutableListOf<Input>()
+
+ /**
+ * The outputs of the multiplexer.
+ */
+ override val outputs: Set<FlowConsumer>
+ get() = _outputs
+ private val _outputs = mutableSetOf<FlowConsumer>()
+ private val _activeOutputs = mutableListOf<Output>()
+
+ /**
+ * The flow counters of this multiplexer.
+ */
+ public override val counters: FlowCounters
+ get() = _counters
+ private val _counters = FlowCountersImpl()
+
+ /**
+ * The actual processing rate of the multiplexer.
+ */
+ private var _rate = 0.0
+
+ /**
+ * The demanded processing rate of the input.
+ */
+ private var _demand = 0.0
+
+ /**
+ * The capacity of the outputs.
+ */
+ private var _capacity = 0.0
+
+ /**
+ * Flag to indicate that the scheduler is active.
+ */
+ private var _schedulerActive = false
+
+ override fun newInput(key: InterferenceKey?): FlowConsumer {
+ val provider = Input(_capacity, key)
+ _inputs.add(provider)
+ return provider
+ }
+
+ override fun addOutput(output: FlowConsumer) {
+ val consumer = Output(output)
+ if (_outputs.add(output)) {
+ _activeOutputs.add(consumer)
+ output.startConsumer(consumer)
+ }
+ }
+
+ override fun removeInput(input: FlowConsumer) {
+ if (!_inputs.remove(input)) {
+ return
+ }
+ // This cast should always succeed since only `Input` instances should be added to `_inputs`
+ (input as Input).close()
+ }
+
+ override fun clear() {
+ for (input in _activeOutputs) {
+ input.cancel()
+ }
+ _activeOutputs.clear()
+
+ for (output in _activeInputs) {
+ output.cancel()
+ }
+ _activeInputs.clear()
+ }
+
+ /**
+ * Converge the scheduler of the multiplexer.
+ */
+ private fun runScheduler(now: Long) {
+ if (_schedulerActive) {
+ return
+ }
+
+ _schedulerActive = true
+ try {
+ doSchedule(now)
+ } finally {
+ _schedulerActive = false
+ }
+ }
+
+ /**
+ * Schedule the inputs over the outputs.
+ */
+ private fun doSchedule(now: Long) {
+ val activeInputs = _activeInputs
+ val activeOutputs = _activeOutputs
+
+ // If there is no work yet, mark the inputs as idle.
+ if (activeInputs.isEmpty()) {
+ return
+ }
+
+ val capacity = _capacity
+ var availableCapacity = capacity
+
+ // Pull in the work of the outputs
+ val inputIterator = activeInputs.listIterator()
+ for (input in inputIterator) {
+ input.pull(now)
+
+ // Remove outputs that have finished
+ if (!input.isActive) {
+ inputIterator.remove()
+ }
+ }
+
+ var demand = 0.0
+
+ // Sort in-place the inputs based on their pushed flow.
+ // Profiling shows that it is faster than maintaining some kind of sorted set.
+ activeInputs.sort()
+
+ // Divide the available output capacity fairly over the inputs using max-min fair sharing
+ var remaining = activeInputs.size
+ for (input in activeInputs) {
+ val availableShare = availableCapacity / remaining--
+ val grantedRate = min(input.allowedRate, availableShare)
+
+ // Ignore empty sources
+ if (grantedRate <= 0.0) {
+ input.actualRate = 0.0
+ continue
+ }
+
+ input.actualRate = grantedRate
+ demand += input.limit
+ availableCapacity -= grantedRate
+ }
+
+ val rate = capacity - availableCapacity
+
+ _demand = demand
+ _rate = rate
+
+ // Sort all consumers by their capacity
+ activeOutputs.sort()
+
+ // Divide the requests over the available capacity of the input resources fairly
+ for (output in activeOutputs) {
+ val inputCapacity = output.capacity
+ val fraction = inputCapacity / capacity
+ val grantedSpeed = rate * fraction
+
+ output.push(grantedSpeed)
+ }
+ }
+
+ /**
+ * Recompute the capacity of the multiplexer.
+ */
+ private fun updateCapacity() {
+ val newCapacity = _activeOutputs.sumOf(Output::capacity)
+
+ // No-op if the capacity is unchanged
+ if (_capacity == newCapacity) {
+ return
+ }
+
+ _capacity = newCapacity
+
+ for (input in _inputs) {
+ input.capacity = newCapacity
+ }
+ }
+
+ /**
+ * An internal [FlowConsumer] implementation for multiplexer inputs.
+ */
+ private inner class Input(capacity: Double, val key: InterferenceKey?) :
+ AbstractFlowConsumer(engine, capacity),
+ FlowConsumerLogic,
+ Comparable<Input> {
+ /**
+ * The requested limit.
+ */
+ @JvmField var limit: Double = 0.0
+
+ /**
+ * The actual processing speed.
+ */
+ @JvmField var actualRate: Double = 0.0
+
+ /**
+ * The processing rate that is allowed by the model constraints.
+ */
+ val allowedRate: Double
+ get() = min(capacity, limit)
+
+ /**
+ * A flag to indicate that the input is closed.
+ */
+ private var _isClosed: Boolean = false
+
+ /**
+ * The timestamp at which we received the last command.
+ */
+ private var _lastPull: Long = Long.MIN_VALUE
+
+ /**
+ * Close the input.
+ *
+ * This method is invoked when the user removes an input from the switch.
+ */
+ fun close() {
+ _isClosed = true
+ cancel()
+ }
+
+ /* AbstractFlowConsumer */
+ override fun createLogic(): FlowConsumerLogic = this
+
+ override fun start(ctx: FlowConsumerContext) {
+ check(!_isClosed) { "Cannot re-use closed input" }
+
+ _activeInputs += this
+ super.start(ctx)
+ }
+
+ /* FlowConsumerLogic */
+ override fun onPush(
+ ctx: FlowConsumerContext,
+ now: Long,
+ delta: Long,
+ rate: Double
+ ) {
+ doUpdateCounters(delta)
+
+ actualRate = 0.0
+ this.limit = rate
+ _lastPull = now
+
+ runScheduler(now)
+ }
+
+ override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
+ parent?.onConverge(now)
+ }
+
+ override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) {
+ doUpdateCounters(delta)
+
+ limit = 0.0
+ actualRate = 0.0
+ _lastPull = now
+ }
+
+ /* Comparable */
+ override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate)
+
+ /**
+ * Pull the source if necessary.
+ */
+ fun pull(now: Long) {
+ val ctx = ctx
+ if (ctx != null && _lastPull < now) {
+ ctx.flush()
+ }
+ }
+
+ /**
+ * Helper method to update the flow counters of the multiplexer.
+ */
+ private fun doUpdateCounters(delta: Long) {
+ if (delta <= 0L) {
+ return
+ }
+
+ // Compute the performance penalty due to flow interference
+ val perfScore = if (interferenceDomain != null) {
+ val load = _rate / capacity
+ interferenceDomain.apply(key, load)
+ } else {
+ 1.0
+ }
+
+ val deltaS = delta / 1000.0
+ val work = limit * deltaS
+ val actualWork = actualRate * deltaS
+ val remainingWork = work - actualWork
+
+ updateCounters(work, actualWork, remainingWork)
+
+ val distCounters = _counters
+ distCounters.demand += work
+ distCounters.actual += actualWork
+ distCounters.overcommit += remainingWork
+ distCounters.interference += actualWork * max(0.0, 1 - perfScore)
+ }
+ }
+
+ /**
+ * An internal [FlowSource] implementation for multiplexer outputs.
+ */
+ private inner class Output(private val provider: FlowConsumer) : FlowSource, Comparable<Output> {
+ /**
+ * The active [FlowConnection] of this source.
+ */
+ private var _ctx: FlowConnection? = null
+
+ /**
+ * The capacity of this output.
+ */
+ val capacity: Double
+ get() = _ctx?.capacity ?: 0.0
+
+ /**
+ * Push the specified rate to the consumer.
+ */
+ fun push(rate: Double) {
+ _ctx?.push(rate)
+ }
+
+ /**
+ * Cancel this output.
+ */
+ fun cancel() {
+ provider.cancel()
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ runScheduler(now)
+ return Long.MAX_VALUE
+ }
+
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
+ when (event) {
+ FlowEvent.Start -> {
+ assert(_ctx == null) { "Source running concurrently" }
+ _ctx = conn
+ updateCapacity()
+ }
+ FlowEvent.Exit -> {
+ _ctx = null
+ updateCapacity()
+ }
+ FlowEvent.Capacity -> updateCapacity()
+ else -> {}
+ }
+ }
+
+ override fun compareTo(other: Output): Int = capacity.compareTo(other.capacity)
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt
new file mode 100644
index 00000000..d9779c6a
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.source
+
+import org.opendc.simulator.flow.FlowConnection
+import org.opendc.simulator.flow.FlowSource
+import kotlin.math.roundToLong
+
+/**
+ * A [FlowSource] that contains a fixed [amount] and is pushed with a given [utilization].
+ */
+public class FixedFlowSource(private val amount: Double, private val utilization: Double) : FlowSource {
+
+ init {
+ require(amount >= 0.0) { "Amount must be positive" }
+ require(utilization > 0.0) { "Utilization must be positive" }
+ }
+
+ private var remainingAmount = amount
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ val consumed = conn.rate * delta / 1000.0
+ val limit = conn.capacity * utilization
+
+ remainingAmount -= consumed
+
+ val duration = (remainingAmount / limit * 1000).roundToLong()
+
+ return if (duration > 0) {
+ conn.push(limit)
+ duration
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt
new file mode 100644
index 00000000..b3191ad3
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.source
+
+/**
+ * The [FlowSourceBarrier] is a barrier that allows multiple sources to wait for a select number of other sources to
+ * finish a pull, before proceeding its operation.
+ */
+public class FlowSourceBarrier(public val parties: Int) {
+ private var counter = 0
+
+ /**
+ * Enter the barrier and determine whether the caller is the last to reach the barrier.
+ *
+ * @return `true` if the caller is the last to reach the barrier, `false` otherwise.
+ */
+ public fun enter(): Boolean {
+ val last = ++counter == parties
+ if (last) {
+ counter = 0
+ return true
+ }
+ return false
+ }
+
+ /**
+ * Reset the barrier.
+ */
+ public fun reset() {
+ counter = 0
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
new file mode 100644
index 00000000..7fcc0405
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.source
+
+import org.opendc.simulator.flow.FlowConnection
+import org.opendc.simulator.flow.FlowEvent
+import org.opendc.simulator.flow.FlowSource
+import kotlin.math.min
+
+/**
+ * Helper class to expose an observable [speed] field describing the speed of the consumer.
+ */
+public class FlowSourceRateAdapter(
+ private val delegate: FlowSource,
+ private val callback: (Double) -> Unit = {}
+) : FlowSource by delegate {
+ /**
+ * The resource processing speed at this instant.
+ */
+ public var speed: Double = 0.0
+ private set(value) {
+ if (field != value) {
+ callback(value)
+ field = value
+ }
+ }
+
+ init {
+ callback(0.0)
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return delegate.onPull(conn, now, delta)
+ }
+
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
+ val oldSpeed = speed
+
+ delegate.onEvent(conn, now, event)
+
+ when (event) {
+ FlowEvent.Converge -> speed = conn.rate
+ FlowEvent.Capacity -> {
+ // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might
+ // need to update the current speed.
+ if (oldSpeed == speed) {
+ speed = min(conn.capacity, speed)
+ }
+ }
+ FlowEvent.Exit -> speed = 0.0
+ else -> {}
+ }
+ }
+
+ override fun onFailure(conn: FlowConnection, cause: Throwable) {
+ speed = 0.0
+
+ delegate.onFailure(conn, cause)
+ }
+
+ override fun toString(): String = "FlowSourceRateAdapter[delegate=$delegate]"
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt
new file mode 100644
index 00000000..4d3ae61a
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.source
+
+import org.opendc.simulator.flow.FlowConnection
+import org.opendc.simulator.flow.FlowEvent
+import org.opendc.simulator.flow.FlowSource
+
+/**
+ * A [FlowSource] that replays a sequence of [Fragment], each indicating the flow rate for some period of time.
+ */
+public class TraceFlowSource(private val trace: Sequence<Fragment>) : FlowSource {
+ private var _iterator: Iterator<Fragment>? = null
+ private var _nextTarget = Long.MIN_VALUE
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ // Check whether the trace fragment was fully consumed, otherwise wait until we have done so
+ val nextTarget = _nextTarget
+ if (nextTarget > now) {
+ return now - nextTarget
+ }
+
+ val iterator = checkNotNull(_iterator)
+ return if (iterator.hasNext()) {
+ val fragment = iterator.next()
+ _nextTarget = now + fragment.duration
+ conn.push(fragment.usage)
+ fragment.duration
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
+ when (event) {
+ FlowEvent.Start -> {
+ check(_iterator == null) { "Source already running" }
+ _iterator = trace.iterator()
+ }
+ FlowEvent.Exit -> {
+ _iterator = null
+ }
+ else -> {}
+ }
+ }
+
+ /**
+ * A fragment of the tgrace.
+ */
+ public data class Fragment(val duration: Long, val usage: Double)
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
new file mode 100644
index 00000000..061ebea6
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt
@@ -0,0 +1,152 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+import io.mockk.*
+import kotlinx.coroutines.*
+import org.junit.jupiter.api.*
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.internal.FlowConsumerContextImpl
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+import org.opendc.simulator.flow.source.FixedFlowSource
+
+/**
+ * A test suite for the [FlowConsumerContextImpl] class.
+ */
+class FlowConsumerContextTest {
+ @Test
+ fun testFlushWithoutCommand() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (now == 0L) {
+ conn.push(1.0)
+ 1000
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+ }
+
+ val logic = object : FlowConsumerLogic {}
+ val context = FlowConsumerContextImpl(engine, consumer, logic)
+
+ engine.scheduleSync(engine.clock.millis(), context)
+ }
+
+ @Test
+ fun testIntermediateFlush() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val consumer = FixedFlowSource(1.0, 1.0)
+
+ val logic = spyk(object : FlowConsumerLogic {})
+ val context = FlowConsumerContextImpl(engine, consumer, logic)
+ context.capacity = 1.0
+
+ context.start()
+ delay(1) // Delay 1 ms to prevent hitting the fast path
+ engine.scheduleSync(engine.clock.millis(), context)
+
+ verify(exactly = 2) { logic.onPush(any(), any(), any(), any()) }
+ }
+
+ @Test
+ fun testDoubleStart() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (now == 0L) {
+ conn.push(0.0)
+ 1000
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+ }
+
+ val logic = object : FlowConsumerLogic {}
+ val context = FlowConsumerContextImpl(engine, consumer, logic)
+
+ context.start()
+
+ assertThrows<IllegalStateException> {
+ context.start()
+ }
+ }
+
+ @Test
+ fun testIdempotentCapacityChange() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val consumer = spyk(object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (now == 0L) {
+ conn.push(1.0)
+ 1000
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+ })
+
+ val logic = object : FlowConsumerLogic {}
+ val context = FlowConsumerContextImpl(engine, consumer, logic)
+ context.capacity = 4200.0
+ context.start()
+ context.capacity = 4200.0
+
+ verify(exactly = 0) { consumer.onEvent(any(), any(), FlowEvent.Capacity) }
+ }
+
+ @Test
+ fun testFailureNoInfiniteLoop() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+
+ val consumer = spyk(object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
+ return Long.MAX_VALUE
+ }
+
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
+ if (event == FlowEvent.Exit) throw IllegalStateException("onEvent")
+ }
+
+ override fun onFailure(conn: FlowConnection, cause: Throwable) {
+ throw IllegalStateException("onFailure")
+ }
+ })
+
+ val logic = object : FlowConsumerLogic {}
+
+ val context = FlowConsumerContextImpl(engine, consumer, logic)
+
+ context.start()
+
+ delay(1)
+
+ verify(exactly = 1) { consumer.onFailure(any(), any()) }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
new file mode 100644
index 00000000..cbc48a4e
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt
@@ -0,0 +1,222 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+import io.mockk.spyk
+import io.mockk.verify
+import kotlinx.coroutines.*
+import org.junit.jupiter.api.Assertions.*
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+import org.opendc.simulator.flow.source.FixedFlowSource
+
+/**
+ * A test suite for the [FlowForwarder] class.
+ */
+internal class FlowForwarderTest {
+ @Test
+ fun testCancelImmediately() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 2000.0)
+
+ launch { source.consume(forwarder) }
+
+ forwarder.consume(object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
+ return Long.MAX_VALUE
+ }
+ })
+
+ forwarder.close()
+ source.cancel()
+ }
+
+ @Test
+ fun testCancel() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 2000.0)
+
+ launch { source.consume(forwarder) }
+
+ forwarder.consume(object : FlowSource {
+ var isFirst = true
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (isFirst) {
+ isFirst = false
+ conn.push(1.0)
+ 10 * 1000
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+ })
+
+ forwarder.close()
+ source.cancel()
+ }
+
+ @Test
+ fun testState() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
+ return Long.MAX_VALUE
+ }
+ }
+
+ assertFalse(forwarder.isActive)
+
+ forwarder.startConsumer(consumer)
+ assertTrue(forwarder.isActive)
+
+ assertThrows<IllegalStateException> { forwarder.startConsumer(consumer) }
+
+ forwarder.cancel()
+ assertFalse(forwarder.isActive)
+
+ forwarder.close()
+ assertFalse(forwarder.isActive)
+ }
+
+ @Test
+ fun testCancelPendingDelegate() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+
+ val consumer = spyk(object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
+ return Long.MAX_VALUE
+ }
+ })
+
+ forwarder.startConsumer(consumer)
+ forwarder.cancel()
+
+ verify(exactly = 0) { consumer.onEvent(any(), any(), FlowEvent.Exit) }
+ }
+
+ @Test
+ fun testCancelStartedDelegate() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 2000.0)
+
+ val consumer = spyk(FixedFlowSource(2000.0, 1.0))
+
+ source.startConsumer(forwarder)
+ yield()
+ forwarder.startConsumer(consumer)
+ yield()
+ forwarder.cancel()
+
+ verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Start) }
+ verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Exit) }
+ }
+
+ @Test
+ fun testCancelPropagation() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 2000.0)
+
+ val consumer = spyk(FixedFlowSource(2000.0, 1.0))
+
+ source.startConsumer(forwarder)
+ yield()
+ forwarder.startConsumer(consumer)
+ yield()
+ source.cancel()
+
+ verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Start) }
+ verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Exit) }
+ }
+
+ @Test
+ fun testExitPropagation() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine, isCoupled = true)
+ val source = FlowSink(engine, 2000.0)
+
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
+ return Long.MAX_VALUE
+ }
+ }
+
+ source.startConsumer(forwarder)
+ forwarder.consume(consumer)
+ yield()
+
+ assertFalse(forwarder.isActive)
+ }
+
+ @Test
+ fun testAdjustCapacity() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 1.0)
+
+ val consumer = spyk(FixedFlowSource(2.0, 1.0))
+ source.startConsumer(forwarder)
+
+ coroutineScope {
+ launch { forwarder.consume(consumer) }
+ delay(1000)
+ source.capacity = 0.5
+ }
+
+ assertEquals(3000, clock.millis())
+ verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Capacity) }
+ }
+
+ @Test
+ fun testCounters() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val forwarder = FlowForwarder(engine)
+ val source = FlowSink(engine, 1.0)
+
+ val consumer = FixedFlowSource(2.0, 1.0)
+ source.startConsumer(forwarder)
+
+ forwarder.consume(consumer)
+
+ yield()
+
+ assertEquals(2.0, source.counters.actual)
+ assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" }
+ assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" }
+ assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" }
+ assertEquals(2000, clock.millis())
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
new file mode 100644
index 00000000..010a985e
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt
@@ -0,0 +1,240 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow
+
+import io.mockk.every
+import io.mockk.mockk
+import io.mockk.spyk
+import io.mockk.verify
+import kotlinx.coroutines.*
+import org.junit.jupiter.api.*
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+import org.opendc.simulator.flow.source.FixedFlowSource
+import org.opendc.simulator.flow.source.FlowSourceRateAdapter
+
+/**
+ * A test suite for the [FlowSink] class.
+ */
+internal class FlowSinkTest {
+ @Test
+ fun testSpeed() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+
+ val consumer = FixedFlowSource(4200.0, 1.0)
+
+ val res = mutableListOf<Double>()
+ val adapter = FlowSourceRateAdapter(consumer, res::add)
+
+ provider.consume(adapter)
+
+ assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
+ }
+
+ @Test
+ fun testAdjustCapacity() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val provider = FlowSink(engine, 1.0)
+
+ val consumer = spyk(FixedFlowSource(2.0, 1.0))
+
+ coroutineScope {
+ launch { provider.consume(consumer) }
+ delay(1000)
+ provider.capacity = 0.5
+ }
+ assertEquals(3000, clock.millis())
+ verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Capacity) }
+ }
+
+ @Test
+ fun testSpeedLimit() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+
+ val consumer = FixedFlowSource(capacity, 2.0)
+
+ val res = mutableListOf<Double>()
+ val adapter = FlowSourceRateAdapter(consumer, res::add)
+
+ provider.consume(adapter)
+
+ assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" }
+ }
+
+ /**
+ * Test to see whether no infinite recursion occurs when interrupting during [FlowSource.onStart] or
+ * [FlowSource.onPull].
+ */
+ @Test
+ fun testIntermediateInterrupt() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ conn.close()
+ return Long.MAX_VALUE
+ }
+
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
+ conn.pull()
+ }
+ }
+
+ provider.consume(consumer)
+ }
+
+ @Test
+ fun testInterrupt() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+ lateinit var resCtx: FlowConnection
+
+ val consumer = object : FlowSource {
+ var isFirst = true
+
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
+ when (event) {
+ FlowEvent.Start -> resCtx = conn
+ else -> {}
+ }
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (isFirst) {
+ isFirst = false
+ conn.push(1.0)
+ 4000
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+ }
+
+ launch {
+ yield()
+ resCtx.pull()
+ }
+ provider.consume(consumer)
+
+ assertEquals(0, clock.millis())
+ }
+
+ @Test
+ fun testFailure() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+
+ val consumer = mockk<FlowSource>(relaxUnitFun = true)
+ every { consumer.onEvent(any(), any(), eq(FlowEvent.Start)) }
+ .throws(IllegalStateException())
+
+ assertThrows<IllegalStateException> {
+ provider.consume(consumer)
+ }
+ }
+
+ @Test
+ fun testExceptionPropagationOnNext() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+
+ val consumer = object : FlowSource {
+ var isFirst = true
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (isFirst) {
+ isFirst = false
+ conn.push(1.0)
+ 1000
+ } else {
+ throw IllegalStateException()
+ }
+ }
+ }
+
+ assertThrows<IllegalStateException> {
+ provider.consume(consumer)
+ }
+ }
+
+ @Test
+ fun testConcurrentConsumption() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+
+ val consumer = FixedFlowSource(capacity, 1.0)
+
+ assertThrows<IllegalStateException> {
+ coroutineScope {
+ launch { provider.consume(consumer) }
+ provider.consume(consumer)
+ }
+ }
+ }
+
+ @Test
+ fun testCancelDuringConsumption() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+
+ val consumer = FixedFlowSource(capacity, 1.0)
+
+ launch { provider.consume(consumer) }
+ delay(500)
+ provider.cancel()
+
+ yield()
+
+ assertEquals(500, clock.millis())
+ }
+
+ @Test
+ fun testInfiniteSleep() {
+ assertThrows<IllegalStateException> {
+ runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+ val capacity = 4200.0
+ val provider = FlowSink(engine, capacity)
+
+ val consumer = object : FlowSource {
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long = Long.MAX_VALUE
+ }
+
+ provider.consume(consumer)
+ }
+ }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt
new file mode 100644
index 00000000..b503087e
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt
@@ -0,0 +1,157 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.mux
+
+import kotlinx.coroutines.yield
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertAll
+import org.junit.jupiter.api.assertThrows
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.*
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+import org.opendc.simulator.flow.source.FixedFlowSource
+import org.opendc.simulator.flow.source.FlowSourceRateAdapter
+import org.opendc.simulator.flow.source.TraceFlowSource
+
+/**
+ * Test suite for the [ForwardingFlowMultiplexer] class.
+ */
+internal class SimResourceSwitchExclusiveTest {
+ /**
+ * Test a trace workload.
+ */
+ @Test
+ fun testTrace() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+
+ val speed = mutableListOf<Double>()
+
+ val duration = 5 * 60L
+ val workload =
+ TraceFlowSource(
+ sequenceOf(
+ TraceFlowSource.Fragment(duration * 1000, 28.0),
+ TraceFlowSource.Fragment(duration * 1000, 3500.0),
+ TraceFlowSource.Fragment(duration * 1000, 0.0),
+ TraceFlowSource.Fragment(duration * 1000, 183.0)
+ ),
+ )
+
+ val switch = ForwardingFlowMultiplexer(engine)
+ val source = FlowSink(engine, 3200.0)
+ val forwarder = FlowForwarder(engine)
+ val adapter = FlowSourceRateAdapter(forwarder, speed::add)
+ source.startConsumer(adapter)
+ switch.addOutput(forwarder)
+
+ val provider = switch.newInput()
+ provider.consume(workload)
+ yield()
+
+ assertAll(
+ { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } },
+ { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } }
+ )
+ }
+
+ /**
+ * Test runtime workload on hypervisor.
+ */
+ @Test
+ fun testRuntimeWorkload() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+
+ val duration = 5 * 60L * 1000
+ val workload = FixedFlowSource(duration * 3.2, 1.0)
+
+ val switch = ForwardingFlowMultiplexer(engine)
+ val source = FlowSink(engine, 3200.0)
+
+ switch.addOutput(source)
+
+ val provider = switch.newInput()
+ provider.consume(workload)
+ yield()
+
+ assertEquals(duration, clock.millis()) { "Took enough time" }
+ }
+
+ /**
+ * Test two workloads running sequentially.
+ */
+ @Test
+ fun testTwoWorkloads() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+
+ val duration = 5 * 60L * 1000
+ val workload = object : FlowSource {
+ var isFirst = true
+
+ override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {
+ when (event) {
+ FlowEvent.Start -> isFirst = true
+ else -> {}
+ }
+ }
+
+ override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
+ return if (isFirst) {
+ isFirst = false
+ conn.push(1.0)
+ duration
+ } else {
+ conn.close()
+ Long.MAX_VALUE
+ }
+ }
+ }
+
+ val switch = ForwardingFlowMultiplexer(engine)
+ val source = FlowSink(engine, 3200.0)
+
+ switch.addOutput(source)
+
+ val provider = switch.newInput()
+ provider.consume(workload)
+ yield()
+ provider.consume(workload)
+ assertEquals(duration * 2, clock.millis()) { "Took enough time" }
+ }
+
+ /**
+ * Test concurrent workloads on the machine.
+ */
+ @Test
+ fun testConcurrentWorkloadFails() = runBlockingSimulation {
+ val engine = FlowEngineImpl(coroutineContext, clock)
+
+ val switch = ForwardingFlowMultiplexer(engine)
+ val source = FlowSink(engine, 3200.0)
+
+ switch.addOutput(source)
+
+ switch.newInput()
+ assertThrows<IllegalStateException> { switch.newInput() }
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt
new file mode 100644
index 00000000..089a8d78
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt
@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.mux
+
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.yield
+import org.junit.jupiter.api.*
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.FlowSink
+import org.opendc.simulator.flow.consume
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+import org.opendc.simulator.flow.source.FixedFlowSource
+import org.opendc.simulator.flow.source.TraceFlowSource
+
+/**
+ * Test suite for the [FlowMultiplexer] implementations
+ */
+internal class SimResourceSwitchMaxMinTest {
+ @Test
+ fun testSmoke() = runBlockingSimulation {
+ val scheduler = FlowEngineImpl(coroutineContext, clock)
+ val switch = MaxMinFlowMultiplexer(scheduler)
+
+ val sources = List(2) { FlowSink(scheduler, 2000.0) }
+ sources.forEach { switch.addOutput(it) }
+
+ val provider = switch.newInput()
+ val consumer = FixedFlowSource(2000.0, 1.0)
+
+ try {
+ provider.consume(consumer)
+ yield()
+ } finally {
+ switch.clear()
+ }
+ }
+
+ /**
+ * Test overcommitting of resources via the hypervisor with a single VM.
+ */
+ @Test
+ fun testOvercommittedSingle() = runBlockingSimulation {
+ val scheduler = FlowEngineImpl(coroutineContext, clock)
+
+ val duration = 5 * 60L
+ val workload =
+ TraceFlowSource(
+ sequenceOf(
+ TraceFlowSource.Fragment(duration * 1000, 28.0),
+ TraceFlowSource.Fragment(duration * 1000, 3500.0),
+ TraceFlowSource.Fragment(duration * 1000, 0.0),
+ TraceFlowSource.Fragment(duration * 1000, 183.0)
+ ),
+ )
+
+ val switch = MaxMinFlowMultiplexer(scheduler)
+ val provider = switch.newInput()
+
+ try {
+ switch.addOutput(FlowSink(scheduler, 3200.0))
+ provider.consume(workload)
+ yield()
+ } finally {
+ switch.clear()
+ }
+
+ assertAll(
+ { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") },
+ { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") },
+ { assertEquals(90000.0, switch.counters.overcommit, "Overcommitted work does not match") },
+ { assertEquals(1200000, clock.millis()) }
+ )
+ }
+
+ /**
+ * Test overcommitting of resources via the hypervisor with two VMs.
+ */
+ @Test
+ fun testOvercommittedDual() = runBlockingSimulation {
+ val scheduler = FlowEngineImpl(coroutineContext, clock)
+
+ val duration = 5 * 60L
+ val workloadA =
+ TraceFlowSource(
+ sequenceOf(
+ TraceFlowSource.Fragment(duration * 1000, 28.0),
+ TraceFlowSource.Fragment(duration * 1000, 3500.0),
+ TraceFlowSource.Fragment(duration * 1000, 0.0),
+ TraceFlowSource.Fragment(duration * 1000, 183.0)
+ ),
+ )
+ val workloadB =
+ TraceFlowSource(
+ sequenceOf(
+ TraceFlowSource.Fragment(duration * 1000, 28.0),
+ TraceFlowSource.Fragment(duration * 1000, 3100.0),
+ TraceFlowSource.Fragment(duration * 1000, 0.0),
+ TraceFlowSource.Fragment(duration * 1000, 73.0)
+ )
+ )
+
+ val switch = MaxMinFlowMultiplexer(scheduler)
+ val providerA = switch.newInput()
+ val providerB = switch.newInput()
+
+ try {
+ switch.addOutput(FlowSink(scheduler, 3200.0))
+
+ coroutineScope {
+ launch { providerA.consume(workloadA) }
+ providerB.consume(workloadB)
+ }
+
+ yield()
+ } finally {
+ switch.clear()
+ }
+ assertAll(
+ { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") },
+ { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") },
+ { assertEquals(1020000.0, switch.counters.overcommit, "Overcommitted work does not match") },
+ { assertEquals(1200000, clock.millis()) }
+ )
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt
new file mode 100644
index 00000000..8396d346
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.source
+
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.opendc.simulator.core.runBlockingSimulation
+import org.opendc.simulator.flow.FlowSink
+import org.opendc.simulator.flow.consume
+import org.opendc.simulator.flow.internal.FlowEngineImpl
+
+/**
+ * A test suite for the [FixedFlowSource] class.
+ */
+internal class FixedFlowSourceTest {
+ @Test
+ fun testSmoke() = runBlockingSimulation {
+ val scheduler = FlowEngineImpl(coroutineContext, clock)
+ val provider = FlowSink(scheduler, 1.0)
+
+ val consumer = FixedFlowSource(1.0, 1.0)
+
+ provider.consume(consumer)
+ assertEquals(1000, clock.millis())
+ }
+
+ @Test
+ fun testUtilization() = runBlockingSimulation {
+ val scheduler = FlowEngineImpl(coroutineContext, clock)
+ val provider = FlowSink(scheduler, 1.0)
+
+ val consumer = FixedFlowSource(1.0, 0.5)
+
+ provider.consume(consumer)
+ assertEquals(2000, clock.millis())
+ }
+}