From 4cc1d40d421c8736f8b21b360b61d6b065158b7a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 29 Sep 2021 23:56:16 +0200 Subject: refactor(simulator): Migrate to flow-based simulation This change renames the `opendc-simulator-resources` module into the `opendc-simulator-flow` module to indicate that the core simulation model of OpenDC is based around modelling and simulating flows. Previously, the distinction between resource consumer and provider, and input and output caused some confusion. By switching to a flow-based model, this distinction is now clear (as in, the water flows from source to consumer/sink). --- .../opendc-simulator-flow/build.gradle.kts | 38 ++ .../org/opendc/simulator/flow/FlowBenchmarks.kt | 140 ++++++++ .../opendc/simulator/flow/AbstractFlowConsumer.kt | 143 ++++++++ .../org/opendc/simulator/flow/FlowConnection.kt | 60 ++++ .../org/opendc/simulator/flow/FlowConsumer.kt | 109 ++++++ .../opendc/simulator/flow/FlowConsumerContext.kt | 45 +++ .../org/opendc/simulator/flow/FlowConsumerLogic.kt | 56 +++ .../org/opendc/simulator/flow/FlowCounters.kt | 53 +++ .../kotlin/org/opendc/simulator/flow/FlowEngine.kt | 95 +++++ .../kotlin/org/opendc/simulator/flow/FlowEvent.kt | 48 +++ .../org/opendc/simulator/flow/FlowForwarder.kt | 217 +++++++++++ .../kotlin/org/opendc/simulator/flow/FlowSink.kt | 61 ++++ .../kotlin/org/opendc/simulator/flow/FlowSource.kt | 58 +++ .../kotlin/org/opendc/simulator/flow/FlowSystem.kt | 43 +++ .../flow/interference/InterferenceDomain.kt | 19 + .../simulator/flow/interference/InterferenceKey.kt | 28 ++ .../flow/internal/FlowConsumerContextImpl.kt | 356 ++++++++++++++++++ .../simulator/flow/internal/FlowCountersImpl.kt | 46 +++ .../simulator/flow/internal/FlowEngineImpl.kt | 297 +++++++++++++++ .../opendc/simulator/flow/mux/FlowMultiplexer.kt | 70 ++++ .../flow/mux/ForwardingFlowMultiplexer.kt | 127 +++++++ .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 399 +++++++++++++++++++++ .../simulator/flow/source/FixedFlowSource.kt | 57 +++ .../simulator/flow/source/FlowSourceBarrier.kt | 52 +++ .../simulator/flow/source/FlowSourceRateAdapter.kt | 82 +++++ .../simulator/flow/source/TraceFlowSource.kt | 72 ++++ .../simulator/flow/FlowConsumerContextTest.kt | 152 ++++++++ .../org/opendc/simulator/flow/FlowForwarderTest.kt | 222 ++++++++++++ .../org/opendc/simulator/flow/FlowSinkTest.kt | 240 +++++++++++++ .../flow/mux/SimResourceSwitchExclusiveTest.kt | 157 ++++++++ .../flow/mux/SimResourceSwitchMaxMinTest.kt | 147 ++++++++ .../simulator/flow/source/FixedFlowSourceTest.kt | 57 +++ 32 files changed, 3746 insertions(+) create mode 100644 opendc-simulator/opendc-simulator-flow/build.gradle.kts create mode 100644 opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt (limited to 'opendc-simulator/opendc-simulator-flow') diff --git a/opendc-simulator/opendc-simulator-flow/build.gradle.kts b/opendc-simulator/opendc-simulator-flow/build.gradle.kts new file mode 100644 index 00000000..5a956fee --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/build.gradle.kts @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +description = "High-performance flow simulator" + +plugins { + `kotlin-library-conventions` + `testing-conventions` + `jacoco-conventions` + `benchmark-conventions` +} + +dependencies { + api(platform(projects.opendcPlatform)) + api(libs.kotlinx.coroutines) + implementation(projects.opendcUtils) + + testImplementation(projects.opendcSimulator.opendcSimulatorCore) +} diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt new file mode 100644 index 00000000..4834f10f --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.launch +import org.opendc.simulator.core.SimulationCoroutineScope +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer +import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer +import org.opendc.simulator.flow.source.TraceFlowSource +import org.openjdk.jmh.annotations.* +import java.util.concurrent.ThreadLocalRandom +import java.util.concurrent.TimeUnit + +@State(Scope.Thread) +@Fork(1) +@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) +@OptIn(ExperimentalCoroutinesApi::class) +class FlowBenchmarks { + private lateinit var scope: SimulationCoroutineScope + private lateinit var engine: FlowEngine + + @Setup + fun setUp() { + scope = SimulationCoroutineScope() + engine = FlowEngine(scope.coroutineContext, scope.clock) + } + + @State(Scope.Thread) + class Workload { + lateinit var trace: Sequence + + @Setup + fun setUp() { + val random = ThreadLocalRandom.current() + val entries = List(10000) { TraceFlowSource.Fragment(1000, random.nextDouble(0.0, 4500.0)) } + trace = entries.asSequence() + } + } + + @Benchmark + fun benchmarkSink(state: Workload) { + return scope.runBlockingSimulation { + val provider = FlowSink(engine, 4200.0) + return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) + } + } + + @Benchmark + fun benchmarkForward(state: Workload) { + return scope.runBlockingSimulation { + val provider = FlowSink(engine, 4200.0) + val forwarder = FlowForwarder(engine) + provider.startConsumer(forwarder) + return@runBlockingSimulation forwarder.consume(TraceFlowSource(state.trace)) + } + } + + @Benchmark + fun benchmarkMuxMaxMinSingleSource(state: Workload) { + return scope.runBlockingSimulation { + val switch = MaxMinFlowMultiplexer(engine) + + switch.addOutput(FlowSink(engine, 3000.0)) + switch.addOutput(FlowSink(engine, 3000.0)) + + val provider = switch.newInput() + return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) + } + } + + @Benchmark + fun benchmarkMuxMaxMinTripleSource(state: Workload) { + return scope.runBlockingSimulation { + val switch = MaxMinFlowMultiplexer(engine) + + switch.addOutput(FlowSink(engine, 3000.0)) + switch.addOutput(FlowSink(engine, 3000.0)) + + repeat(3) { + launch { + val provider = switch.newInput() + provider.consume(TraceFlowSource(state.trace)) + } + } + } + } + + @Benchmark + fun benchmarkMuxExclusiveSingleSource(state: Workload) { + return scope.runBlockingSimulation { + val switch = ForwardingFlowMultiplexer(engine) + + switch.addOutput(FlowSink(engine, 3000.0)) + switch.addOutput(FlowSink(engine, 3000.0)) + + val provider = switch.newInput() + return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) + } + } + + @Benchmark + fun benchmarkMuxExclusiveTripleSource(state: Workload) { + return scope.runBlockingSimulation { + val switch = ForwardingFlowMultiplexer(engine) + + switch.addOutput(FlowSink(engine, 3000.0)) + switch.addOutput(FlowSink(engine, 3000.0)) + + repeat(2) { + launch { + val provider = switch.newInput() + provider.consume(TraceFlowSource(state.trace)) + } + } + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt new file mode 100644 index 00000000..c8092082 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt @@ -0,0 +1,143 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +import org.opendc.simulator.flow.internal.FlowCountersImpl + +/** + * Abstract implementation of the [FlowConsumer] which can be re-used by other implementations. + */ +public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initialCapacity: Double) : FlowConsumer { + /** + * A flag to indicate that the flow consumer is active. + */ + public override val isActive: Boolean + get() = ctx != null + + /** + * The capacity of the consumer. + */ + public override var capacity: Double = initialCapacity + set(value) { + field = value + ctx?.capacity = value + } + + /** + * The current processing rate of the consumer. + */ + public override val rate: Double + get() = ctx?.rate ?: 0.0 + + /** + * The flow processing rate demand at this instant. + */ + public override val demand: Double + get() = ctx?.demand ?: 0.0 + + /** + * The flow counters to track the flow metrics of the consumer. + */ + public override val counters: FlowCounters + get() = _counters + private val _counters = FlowCountersImpl() + + /** + * The [FlowConsumerContext] that is currently running. + */ + protected var ctx: FlowConsumerContext? = null + private set + + /** + * Construct the [FlowConsumerLogic] instance for a new source. + */ + protected abstract fun createLogic(): FlowConsumerLogic + + /** + * Start the specified [FlowConsumerContext]. + */ + protected open fun start(ctx: FlowConsumerContext) { + ctx.start() + } + + /** + * The previous demand for the consumer. + */ + private var previousDemand = 0.0 + + /** + * Update the counters of the flow consumer. + */ + protected fun updateCounters(ctx: FlowConnection, delta: Long) { + val demand = previousDemand + previousDemand = ctx.demand + + if (delta <= 0) { + return + } + + val counters = _counters + val deltaS = delta / 1000.0 + val work = demand * deltaS + val actualWork = ctx.rate * deltaS + val remainingWork = work - actualWork + + counters.demand += work + counters.actual += actualWork + counters.overcommit += remainingWork + } + + /** + * Update the counters of the flow consumer. + */ + protected fun updateCounters(demand: Double, actual: Double, overcommit: Double) { + val counters = _counters + counters.demand += demand + counters.actual += actual + counters.overcommit += overcommit + } + + final override fun startConsumer(source: FlowSource) { + check(ctx == null) { "Consumer is in invalid state" } + val ctx = engine.newContext(source, createLogic()) + + ctx.capacity = capacity + this.ctx = ctx + + start(ctx) + } + + final override fun pull() { + ctx?.pull() + } + + final override fun cancel() { + val ctx = ctx + if (ctx != null) { + this.ctx = null + ctx.close() + } + } + + override fun toString(): String = "AbstractFlowConsumer[capacity=$capacity]" +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt new file mode 100644 index 00000000..fa833961 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +/** + * An active connection between a [FlowSource] and [FlowConsumer]. + */ +public interface FlowConnection : AutoCloseable { + /** + * The capacity of the connection. + */ + public val capacity: Double + + /** + * The flow rate over the connection. + */ + public val rate: Double + + /** + * The flow demand of the source. + */ + public val demand: Double + + /** + * Pull the source. + */ + public fun pull() + + /** + * Push the given flow [rate] over this connection. + * + * @param rate The rate of the flow to push. + */ + public fun push(rate: Double) + + /** + * Disconnect the consumer from its source. + */ + public override fun close() +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt new file mode 100644 index 00000000..3a6e2e97 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException + +/** + * A consumer of a [FlowSource]. + */ +public interface FlowConsumer { + /** + * A flag to indicate that the consumer is currently consuming a [FlowSource]. + */ + public val isActive: Boolean + + /** + * The flow capacity of this consumer. + */ + public val capacity: Double + + /** + * The current flow rate of the consumer. + */ + public val rate: Double + + /** + * The current flow demand. + */ + public val demand: Double + + /** + * The flow counters to track the flow metrics of the consumer. + */ + public val counters: FlowCounters + + /** + * Start consuming the specified [source]. + * + * @throws IllegalStateException if the consumer is already active. + */ + public fun startConsumer(source: FlowSource) + + /** + * Ask the consumer to pull its source. + * + * If the consumer is not active, this operation will be a no-op. + */ + public fun pull() + + /** + * Disconnect the consumer from its source. + * + * If the consumer is not active, this operation will be a no-op. + */ + public fun cancel() +} + +/** + * Consume the specified [source] and suspend execution until the source is fully consumed or failed. + */ +public suspend fun FlowConsumer.consume(source: FlowSource) { + return suspendCancellableCoroutine { cont -> + startConsumer(object : FlowSource by source { + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + source.onEvent(conn, now, event) + + if (event == FlowEvent.Exit && !cont.isCompleted) { + cont.resume(Unit) + } + } + + override fun onFailure(conn: FlowConnection, cause: Throwable) { + try { + source.onFailure(conn, cause) + cont.resumeWithException(cause) + } catch (e: Throwable) { + e.addSuppressed(cause) + cont.resumeWithException(e) + } + } + + override fun toString(): String = "SuspendingFlowSource" + }) + + cont.invokeOnCancellation { cancel() } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt new file mode 100644 index 00000000..75b2d25b --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +/** + * A controllable [FlowConnection]. + * + * This interface is used by [FlowConsumer]s to control the connection between it and the source. + */ +public interface FlowConsumerContext : FlowConnection { + /** + * The capacity of the connection. + */ + public override var capacity: Double + + /** + * Start the flow over the connection. + */ + public fun start() + + /** + * Synchronously flush the changes of the connection. + */ + public fun flush() +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt new file mode 100644 index 00000000..c69cb17e --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +/** + * A collection of callbacks associated with a [FlowConsumer]. + */ +public interface FlowConsumerLogic { + /** + * This method is invoked when a [FlowSource] changes the rate of flow to this consumer. + * + * @param ctx The context in which the provider runs. + * @param now The virtual timestamp in milliseconds at which the update is occurring. + * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds. + * @param rate The requested processing rate of the source. + */ + public fun onPush(ctx: FlowConsumerContext, now: Long, delta: Long, rate: Double) {} + + /** + * This method is invoked when the flow graph has converged into a steady-state system. + * + * @param ctx The context in which the provider runs. + * @param now The virtual timestamp in milliseconds at which the system converged. + * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. + */ + public fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {} + + /** + * This method is invoked when the [FlowSource] is completed. + * + * @param ctx The context in which the provider runs. + * @param now The virtual timestamp in milliseconds at which the provider finished. + * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds. + */ + public fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) {} +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt new file mode 100644 index 00000000..e15d7643 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +/** + * An interface that tracks cumulative counts of the flow accumulation over a stage. + */ +public interface FlowCounters { + /** + * The accumulated flow that a source wanted to push over the connection. + */ + public val demand: Double + + /** + * The accumulated flow that was actually transferred over the connection. + */ + public val actual: Double + + /** + * The accumulated flow that could not be transferred over the connection. + */ + public val overcommit: Double + + /** + * The accumulated flow lost due to interference between sources. + */ + public val interference: Double + + /** + * Reset the flow counters. + */ + public fun reset() +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt new file mode 100644 index 00000000..65224827 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +import org.opendc.simulator.flow.internal.FlowEngineImpl +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * A [FlowEngine] is responsible for managing the interaction between [FlowSource]s and [FlowConsumer]s. + * + * The engine centralizes the scheduling logic of state updates of flow connections, allowing update propagation + * to happen more efficiently. and overall, reducing the work necessary to transition into a steady state. + */ +public interface FlowEngine { + /** + * The virtual [Clock] associated with this engine. + */ + public val clock: Clock + + /** + * Create a new [FlowConsumerContext] with the given [provider]. + * + * @param consumer The consumer logic. + * @param provider The logic of the resource provider. + */ + public fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext + + /** + * Start batching the execution of resource updates until [popBatch] is called. + * + * This method is useful if you want to propagate multiple resources updates (e.g., starting multiple CPUs + * simultaneously) in a single state update. + * + * Multiple calls to this method requires the same number of [popBatch] calls in order to properly flush the + * resource updates. This allows nested calls to [pushBatch], but might cause issues if [popBatch] is not called + * the same amount of times. To simplify batching, see [batch]. + */ + public fun pushBatch() + + /** + * Stop the batching of resource updates and run the interpreter on the batch. + * + * Note that method will only flush the event once the first call to [pushBatch] has received a [popBatch] call. + */ + public fun popBatch() + + public companion object { + /** + * Construct a new [FlowEngine] implementation. + * + * @param context The coroutine context to use. + * @param clock The virtual simulation clock. + */ + @JvmStatic + @JvmName("create") + public operator fun invoke(context: CoroutineContext, clock: Clock): FlowEngine { + return FlowEngineImpl(context, clock) + } + } +} + +/** + * Batch the execution of several interrupts into a single call. + * + * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update. + */ +public inline fun FlowEngine.batch(block: () -> Unit) { + try { + pushBatch() + block() + } finally { + popBatch() + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt new file mode 100644 index 00000000..14c85183 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +/** + * A flow event that is communicated to a [FlowSource]. + */ +public enum class FlowEvent { + /** + * This event is emitted to the source when it has started. + */ + Start, + + /** + * This event is emitted to the source when it is stopped. + */ + Exit, + + /** + * This event is emitted to the source when the system has converged into a steady state. + */ + Converge, + + /** + * This event is emitted to the source when the capacity of the consumer has changed. + */ + Capacity, +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt new file mode 100644 index 00000000..2074033e --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -0,0 +1,217 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +import org.opendc.simulator.flow.internal.FlowCountersImpl + +/** + * A class that acts as a [FlowSource] and [FlowConsumer] at the same time. + * + * @param engine The [FlowEngine] the forwarder runs in. + * @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits. + */ +public class FlowForwarder(private val engine: FlowEngine, private val isCoupled: Boolean = false) : FlowSource, FlowConsumer, AutoCloseable { + /** + * The delegate [FlowSource]. + */ + private var delegate: FlowSource? = null + + /** + * A flag to indicate that the delegate was started. + */ + private var hasDelegateStarted: Boolean = false + + /** + * The exposed [FlowConnection]. + */ + private val _ctx = object : FlowConnection { + override val capacity: Double + get() = _innerCtx?.capacity ?: 0.0 + + override val demand: Double + get() = _innerCtx?.demand ?: 0.0 + + override val rate: Double + get() = _innerCtx?.rate ?: 0.0 + + override fun pull() { + _innerCtx?.pull() + } + + override fun push(rate: Double) { + _innerCtx?.push(rate) + _demand = rate + } + + override fun close() { + val delegate = checkNotNull(delegate) { "Delegate not active" } + + if (isCoupled) + _innerCtx?.close() + else + _innerCtx?.push(0.0) + + // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we + // reset beforehand the existing state and check whether it has been updated afterwards + reset() + + delegate.onEvent(this, engine.clock.millis(), FlowEvent.Exit) + } + } + + /** + * The [FlowConnection] in which the forwarder runs. + */ + private var _innerCtx: FlowConnection? = null + + override val isActive: Boolean + get() = delegate != null + + override val capacity: Double + get() = _ctx.capacity + + override val rate: Double + get() = _ctx.rate + + override val demand: Double + get() = _ctx.demand + + override val counters: FlowCounters + get() = _counters + private val _counters = FlowCountersImpl() + + override fun startConsumer(source: FlowSource) { + check(delegate == null) { "Forwarder already active" } + + delegate = source + + // Pull to replace the source + pull() + } + + override fun pull() { + _ctx.pull() + } + + override fun cancel() { + val delegate = delegate + val ctx = _innerCtx + + if (delegate != null) { + this.delegate = null + + if (ctx != null) { + delegate.onEvent(this._ctx, engine.clock.millis(), FlowEvent.Exit) + } + } + } + + override fun close() { + val ctx = _innerCtx + + if (ctx != null) { + this._innerCtx = null + ctx.pull() + } + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + val delegate = delegate + + if (!hasDelegateStarted) { + start() + } + + updateCounters(conn, delta) + + return delegate?.onPull(this._ctx, now, delta) ?: Long.MAX_VALUE + } + + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + when (event) { + FlowEvent.Start -> { + _innerCtx = conn + } + FlowEvent.Exit -> { + _innerCtx = null + + val delegate = delegate + if (delegate != null) { + reset() + delegate.onEvent(this._ctx, now, FlowEvent.Exit) + } + } + else -> delegate?.onEvent(this._ctx, now, event) + } + } + + override fun onFailure(conn: FlowConnection, cause: Throwable) { + _innerCtx = null + + val delegate = delegate + if (delegate != null) { + reset() + delegate.onFailure(this._ctx, cause) + } + } + + /** + * Start the delegate. + */ + private fun start() { + val delegate = delegate ?: return + delegate.onEvent(checkNotNull(_innerCtx), engine.clock.millis(), FlowEvent.Start) + + hasDelegateStarted = true + } + + /** + * Reset the delegate. + */ + private fun reset() { + delegate = null + hasDelegateStarted = false + } + + /** + * The requested flow rate. + */ + private var _demand: Double = 0.0 + + /** + * Update the flow counters for the transformer. + */ + private fun updateCounters(ctx: FlowConnection, delta: Long) { + if (delta <= 0) { + return + } + + val counters = _counters + val deltaS = delta / 1000.0 + val work = _demand * deltaS + val actualWork = ctx.rate * deltaS + counters.demand += work + counters.actual += actualWork + counters.overcommit += (work - actualWork) + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt new file mode 100644 index 00000000..fb6ca85d --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +/** + * A [FlowSink] represents a sink with a fixed capacity. + * + * @param initialCapacity The initial capacity of the resource. + * @param engine The engine that is used for driving the flow simulation. + * @param parent The parent flow system. + */ +public class FlowSink( + private val engine: FlowEngine, + initialCapacity: Double, + private val parent: FlowSystem? = null +) : AbstractFlowConsumer(engine, initialCapacity) { + + override fun createLogic(): FlowConsumerLogic { + return object : FlowConsumerLogic { + override fun onPush( + ctx: FlowConsumerContext, + now: Long, + delta: Long, + rate: Double + ) { + updateCounters(ctx, delta) + } + + override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) { + updateCounters(ctx, delta) + cancel() + } + + override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { + parent?.onConverge(now) + } + } + } + + override fun toString(): String = "FlowSink[capacity=$capacity]" +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt new file mode 100644 index 00000000..077b4d38 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +/** + * A source of flow that is consumed by a [FlowConsumer]. + * + * Implementations of this interface should be considered stateful and must be assumed not to be re-usable + * (concurrently) for multiple [FlowConsumer]s, unless explicitly said otherwise. + */ +public interface FlowSource { + /** + * This method is invoked when the source is pulled. + * + * @param conn The connection between the source and consumer. + * @param now The virtual timestamp in milliseconds at which the pull is occurring. + * @param delta The virtual duration between this call and the last call to [onPull] in milliseconds. + * @return The duration after which the resource consumer should be pulled again. + */ + public fun onPull(conn: FlowConnection, now: Long, delta: Long): Long + + /** + * This method is invoked when an event has occurred. + * + * @param conn The connection between the source and consumer. + * @param now The virtual timestamp in milliseconds at which the event is occurring. + * @param event The event that has occurred. + */ + public fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {} + + /** + * This method is invoked when the source throws an exception. + * + * @param conn The connection between the source and consumer. + * @param cause The cause of the failure. + */ + public fun onFailure(conn: FlowConnection, cause: Throwable) {} +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt new file mode 100644 index 00000000..db6aa69f --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +/** + * A system of possible multiple sub-resources. + * + * This interface is used to model hierarchies of resource providers, which can listen efficiently to changes of the + * resource provider. + */ +public interface FlowSystem { + /** + * The parent system to which this system belongs or `null` if it has no parent. + */ + public val parent: FlowSystem? + + /** + * This method is invoked when the system has converged to a steady-state. + * + * @param timestamp The timestamp at which the system converged. + */ + public fun onConverge(timestamp: Long) +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt new file mode 100644 index 00000000..aa2713b6 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt @@ -0,0 +1,19 @@ +package org.opendc.simulator.flow.interference + +import org.opendc.simulator.flow.FlowSource + +/** + * An interference domain represents a system of flow stages where [flow sources][FlowSource] may incur + * performance variability due to operating on the same resource and therefore causing interference. + */ +public interface InterferenceDomain { + /** + * Compute the performance score of a participant in this interference domain. + * + * @param key The participant to obtain the score of or `null` if the participant has no key. + * @param load The overall load on the interference domain. + * @return A score representing the performance score to be applied to the resource consumer, with 1 + * meaning no influence, <1 means that performance degrades, and >1 means that performance improves. + */ + public fun apply(key: InterferenceKey?, load: Double): Double +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt new file mode 100644 index 00000000..d28ebde5 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.interference + +/** + * A key that uniquely identifies a participant of an interference domain. + */ +public interface InterferenceKey diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt new file mode 100644 index 00000000..9f3afc4d --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -0,0 +1,356 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.internal + +import org.opendc.simulator.flow.* +import java.util.ArrayDeque +import kotlin.math.max +import kotlin.math.min + +/** + * Implementation of a [FlowConnection] managing the communication between flow sources and consumers. + */ +internal class FlowConsumerContextImpl( + private val engine: FlowEngineImpl, + private val source: FlowSource, + private val logic: FlowConsumerLogic +) : FlowConsumerContext { + /** + * The clock to track simulation time. + */ + private val _clock = engine.clock + + /** + * The capacity of the resource. + */ + override var capacity: Double = 0.0 + set(value) { + val oldValue = field + + // Only changes will be propagated + if (value != oldValue) { + field = value + onCapacityChange() + } + } + + /** + * A flag to indicate the state of the context. + */ + private var _state = State.Pending + + /** + * The current processing speed of the resource. + */ + override val rate: Double + get() = _rate + private var _rate = 0.0 + + /** + * The current resource processing demand. + */ + override val demand: Double + get() = _limit + + /** + * The current state of the resource context. + */ + private var _limit: Double = 0.0 + private var _activeLimit: Double = 0.0 + private var _deadline: Long = Long.MIN_VALUE + + /** + * A flag to indicate that an update is active. + */ + private var _updateActive = false + + /** + * The update flag indicating why the update was triggered. + */ + private var _flag: Int = 0 + + /** + * The timestamp of calls to the callbacks. + */ + private var _lastUpdate: Long = Long.MIN_VALUE + private var _lastConvergence: Long = Long.MAX_VALUE + + /** + * The timers at which the context is scheduled to be interrupted. + */ + private val _timers: ArrayDeque = ArrayDeque() + + override fun start() { + check(_state == State.Pending) { "Consumer is already started" } + engine.batch { + source.onEvent(this, _clock.millis(), FlowEvent.Start) + _state = State.Active + pull() + } + } + + override fun close() { + if (_state == State.Stopped) { + return + } + + engine.batch { + _state = State.Stopped + if (!_updateActive) { + val now = _clock.millis() + val delta = max(0, now - _lastUpdate) + doStop(now, delta) + + // FIX: Make sure the context converges + _flag = _flag or FLAG_INVALIDATE + scheduleUpdate(_clock.millis()) + } + } + } + + override fun pull() { + if (_state == State.Stopped) { + return + } + + _flag = _flag or FLAG_INTERRUPT + scheduleUpdate(_clock.millis()) + } + + override fun flush() { + if (_state == State.Stopped) { + return + } + + engine.scheduleSync(_clock.millis(), this) + } + + override fun push(rate: Double) { + if (_limit == rate) { + return + } + + _limit = rate + + // Invalidate only if the active limit is change and no update is active + // If an update is active, it will already get picked up at the end of the update + if (_activeLimit != rate && !_updateActive) { + _flag = _flag or FLAG_INVALIDATE + scheduleUpdate(_clock.millis()) + } + } + + /** + * Determine whether the state of the resource context should be updated. + */ + fun shouldUpdate(timestamp: Long): Boolean { + // Either the resource context is flagged or there is a pending update at this timestamp + return _flag != 0 || _limit != _activeLimit || _deadline == timestamp + } + + /** + * Update the state of the resource context. + */ + fun doUpdate(now: Long) { + val oldState = _state + if (oldState != State.Active) { + return + } + + val lastUpdate = _lastUpdate + + _lastUpdate = now + _updateActive = true + + val delta = max(0, now - lastUpdate) + + try { + val duration = source.onPull(this, now, delta) + val newDeadline = if (duration != Long.MAX_VALUE) now + duration else duration + + // Reset update flags + _flag = 0 + + // Check whether the state has changed after [consumer.onNext] + when (_state) { + State.Active -> { + logic.onPush(this, now, delta, _limit) + + // Schedule an update at the new deadline + scheduleUpdate(now, newDeadline) + } + State.Stopped -> doStop(now, delta) + State.Pending -> throw IllegalStateException("Illegal transition to pending state") + } + + // Note: pending limit might be changed by [logic.onConsume], so re-fetch the value + val newLimit = _limit + + // Flush the changes to the flow + _activeLimit = newLimit + _deadline = newDeadline + _rate = min(capacity, newLimit) + } catch (cause: Throwable) { + doFail(now, delta, cause) + } finally { + _updateActive = false + } + } + + /** + * Prune the elapsed timers from this context. + */ + fun pruneTimers(now: Long) { + val timers = _timers + while (true) { + val head = timers.peek() + if (head == null || head.target > now) { + break + } + timers.poll() + } + } + + /** + * Try to re-schedule the resource context in case it was skipped. + */ + fun tryReschedule(now: Long) { + val deadline = _deadline + if (deadline > now && deadline != Long.MAX_VALUE) { + scheduleUpdate(now, deadline) + } + } + + /** + * This method is invoked when the system converges into a steady state. + */ + fun onConverge(timestamp: Long) { + val delta = max(0, timestamp - _lastConvergence) + _lastConvergence = timestamp + + try { + if (_state == State.Active) { + source.onEvent(this, timestamp, FlowEvent.Converge) + } + + logic.onConverge(this, timestamp, delta) + } catch (cause: Throwable) { + doFail(timestamp, max(0, timestamp - _lastUpdate), cause) + } + } + + override fun toString(): String = "FlowConsumerContextImpl[capacity=$capacity,rate=$_rate]" + + /** + * Stop the resource context. + */ + private fun doStop(now: Long, delta: Long) { + try { + source.onEvent(this, now, FlowEvent.Exit) + logic.onFinish(this, now, delta) + } catch (cause: Throwable) { + doFail(now, delta, cause) + } finally { + _deadline = Long.MAX_VALUE + _limit = 0.0 + } + } + + /** + * Fail the resource consumer. + */ + private fun doFail(now: Long, delta: Long, cause: Throwable) { + try { + source.onFailure(this, cause) + } catch (e: Throwable) { + e.addSuppressed(cause) + e.printStackTrace() + } + + logic.onFinish(this, now, delta) + } + + /** + * Indicate that the capacity of the resource has changed. + */ + private fun onCapacityChange() { + // Do not inform the consumer if it has not been started yet + if (_state != State.Active) { + return + } + + engine.batch { + // Inform the consumer of the capacity change. This might already trigger an interrupt. + source.onEvent(this, _clock.millis(), FlowEvent.Capacity) + + pull() + } + } + + /** + * Schedule an update for this resource context. + */ + private fun scheduleUpdate(now: Long) { + engine.scheduleImmediate(now, this) + } + + /** + * Schedule a delayed update for this resource context. + */ + private fun scheduleUpdate(now: Long, target: Long) { + val timers = _timers + if (target != Long.MAX_VALUE && (timers.isEmpty() || target < timers.peek().target)) { + timers.addFirst(engine.scheduleDelayed(now, this, target)) + } + } + + /** + * The state of a resource context. + */ + private enum class State { + /** + * The resource context is pending and the resource is waiting to be consumed. + */ + Pending, + + /** + * The resource context is active and the resource is currently being consumed. + */ + Active, + + /** + * The resource context is stopped and the resource cannot be consumed anymore. + */ + Stopped + } + + /** + * A flag to indicate that the context should be invalidated. + */ + private val FLAG_INVALIDATE = 0b01 + + /** + * A flag to indicate that the context should be interrupted. + */ + private val FLAG_INTERRUPT = 0b10 +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt new file mode 100644 index 00000000..141d335d --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.internal + +import org.opendc.simulator.flow.FlowCounters + +/** + * Mutable implementation of the [FlowCounters] interface. + */ +internal class FlowCountersImpl : FlowCounters { + override var demand: Double = 0.0 + override var actual: Double = 0.0 + override var overcommit: Double = 0.0 + override var interference: Double = 0.0 + + override fun reset() { + demand = 0.0 + actual = 0.0 + overcommit = 0.0 + interference = 0.0 + } + + override fun toString(): String { + return "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit,interference=$interference]" + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt new file mode 100644 index 00000000..1a50da2c --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt @@ -0,0 +1,297 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.internal + +import kotlinx.coroutines.Delay +import kotlinx.coroutines.DisposableHandle +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.Runnable +import org.opendc.simulator.flow.* +import java.time.Clock +import java.util.* +import kotlin.coroutines.ContinuationInterceptor +import kotlin.coroutines.CoroutineContext + +/** + * Internal implementation of the [FlowEngine] interface. + * + * @param context The coroutine context to use. + * @param clock The virtual simulation clock. + */ +internal class FlowEngineImpl(private val context: CoroutineContext, override val clock: Clock) : FlowEngine { + /** + * The [Delay] instance that provides scheduled execution of [Runnable]s. + */ + @OptIn(InternalCoroutinesApi::class) + private val delay = requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" } + + /** + * The queue of connection updates that are scheduled for immediate execution. + */ + private val queue = ArrayDeque() + + /** + * A priority queue containing the connection updates to be scheduled in the future. + */ + private val futureQueue = PriorityQueue() + + /** + * The stack of engine invocations to occur in the future. + */ + private val futureInvocations = ArrayDeque() + + /** + * The systems that have been visited during the engine cycle. + */ + private val visited = linkedSetOf() + + /** + * The index in the batch stack. + */ + private var batchIndex = 0 + + /** + * A flag to indicate that the engine is currently active. + */ + private val isRunning: Boolean + get() = batchIndex > 0 + + /** + * Update the specified [ctx] synchronously. + */ + fun scheduleSync(now: Long, ctx: FlowConsumerContextImpl) { + ctx.doUpdate(now) + visited.add(ctx) + + // In-case the engine is already running in the call-stack, return immediately. The changes will be picked + // up by the active engine. + if (isRunning) { + return + } + + try { + batchIndex++ + runEngine(now) + } finally { + batchIndex-- + } + } + + /** + * Enqueue the specified [ctx] to be updated immediately during the active engine cycle. + * + * This method should be used when the state of a flow context is invalidated/interrupted and needs to be + * re-computed. In case no engine is currently active, the engine will be started. + */ + fun scheduleImmediate(now: Long, ctx: FlowConsumerContextImpl) { + queue.add(ctx) + + // In-case the engine is already running in the call-stack, return immediately. The changes will be picked + // up by the active engine. + if (isRunning) { + return + } + + try { + batchIndex++ + runEngine(now) + } finally { + batchIndex-- + } + } + + /** + * Schedule the engine to run at [target] to update the flow contexts. + * + * This method will override earlier calls to this method for the same [ctx]. + * + * @param now The current virtual timestamp. + * @param ctx The flow context to which the event applies. + * @param target The timestamp when the interrupt should happen. + */ + fun scheduleDelayed(now: Long, ctx: FlowConsumerContextImpl, target: Long): Timer { + val futureQueue = futureQueue + + require(target >= now) { "Timestamp must be in the future" } + + val timer = Timer(ctx, target) + futureQueue.add(timer) + + return timer + } + + override fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext = FlowConsumerContextImpl(this, consumer, provider) + + override fun pushBatch() { + batchIndex++ + } + + override fun popBatch() { + try { + // Flush the work if the platform is not already running + if (batchIndex == 1 && queue.isNotEmpty()) { + runEngine(clock.millis()) + } + } finally { + batchIndex-- + } + } + + /** + * Run all the enqueued actions for the specified [timestamp][now]. + */ + private fun runEngine(now: Long) { + val queue = queue + val futureQueue = futureQueue + val futureInvocations = futureInvocations + val visited = visited + + // Remove any entries in the `futureInvocations` queue from the past + while (true) { + val head = futureInvocations.peek() + if (head == null || head.timestamp > now) { + break + } + futureInvocations.poll() + } + + // Execute all scheduled updates at current timestamp + while (true) { + val timer = futureQueue.peek() ?: break + val ctx = timer.ctx + val target = timer.target + + assert(target >= now) { "Internal inconsistency: found update of the past" } + + if (target > now) { + break + } + + futureQueue.poll() + + ctx.pruneTimers(now) + + if (ctx.shouldUpdate(now)) { + ctx.doUpdate(now) + visited.add(ctx) + } else { + ctx.tryReschedule(now) + } + } + + // Repeat execution of all immediate updates until the system has converged to a steady-state + // We have to take into account that the onConverge callback can also trigger new actions. + do { + // Execute all immediate updates + while (true) { + val ctx = queue.poll() ?: break + + if (ctx.shouldUpdate(now)) { + ctx.doUpdate(now) + visited.add(ctx) + } + } + + for (system in visited) { + system.onConverge(now) + } + + visited.clear() + } while (queue.isNotEmpty()) + + // Schedule an engine invocation for the next update to occur. + val headTimer = futureQueue.peek() + if (headTimer != null) { + trySchedule(now, futureInvocations, headTimer.target) + } + } + + /** + * Try to schedule an engine invocation at the specified [target]. + * + * @param now The current virtual timestamp. + * @param target The virtual timestamp at which the engine invocation should happen. + * @param scheduled The queue of scheduled invocations. + */ + private fun trySchedule(now: Long, scheduled: ArrayDeque, target: Long) { + while (true) { + val invocation = scheduled.peekFirst() + if (invocation == null || invocation.timestamp > target) { + // Case 2: A new timer was registered ahead of the other timers. + // Solution: Schedule a new scheduler invocation + @OptIn(InternalCoroutinesApi::class) + val handle = delay.invokeOnTimeout( + target - now, + { + try { + batchIndex++ + runEngine(target) + } finally { + batchIndex-- + } + }, + context + ) + scheduled.addFirst(Invocation(target, handle)) + break + } else if (invocation.timestamp < target) { + // Case 2: A timer was cancelled and the head of the timer queue is now later than excepted + // Solution: Cancel the next scheduler invocation + scheduled.pollFirst() + + invocation.cancel() + } else { + break + } + } + } + + /** + * A future engine invocation. + * + * This class is used to keep track of the future engine invocations created using the [Delay] instance. In case + * the invocation is not needed anymore, it can be cancelled via [cancel]. + */ + private data class Invocation( + @JvmField val timestamp: Long, + @JvmField val handle: DisposableHandle + ) { + /** + * Cancel the engine invocation. + */ + fun cancel() = handle.dispose() + } + + /** + * An update call for [ctx] that is scheduled for [target]. + * + * This class represents an update in the future at [target] requested by [ctx]. + */ + class Timer(@JvmField val ctx: FlowConsumerContextImpl, @JvmField val target: Long) : Comparable { + override fun compareTo(other: Timer): Int { + return target.compareTo(other.target) + } + + override fun toString(): String = "Timer[ctx=$ctx,timestamp=$target]" + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt new file mode 100644 index 00000000..17b82391 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.mux + +import org.opendc.simulator.flow.FlowConsumer +import org.opendc.simulator.flow.FlowCounters +import org.opendc.simulator.flow.FlowSource +import org.opendc.simulator.flow.interference.InterferenceKey + +/** + * A [FlowMultiplexer] enables multiplexing multiple [FlowSource]s over possibly multiple [FlowConsumer]s. + */ +public interface FlowMultiplexer { + /** + * The inputs of the multiplexer that can be used to consume sources. + */ + public val inputs: Set + + /** + * The outputs of the multiplexer over which the flows will be distributed. + */ + public val outputs: Set + + /** + * The flow counters to track the flow metrics of all multiplexer inputs. + */ + public val counters: FlowCounters + + /** + * Create a new input on this multiplexer. + * + * @param key The key of the interference member to which the input belongs. + */ + public fun newInput(key: InterferenceKey? = null): FlowConsumer + + /** + * Remove [input] from this multiplexer. + */ + public fun removeInput(input: FlowConsumer) + + /** + * Add the specified [output] to the multiplexer. + */ + public fun addOutput(output: FlowConsumer) + + /** + * Clear all inputs and outputs from the switch. + */ + public fun clear() +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt new file mode 100644 index 00000000..811d9460 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.mux + +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.interference.InterferenceKey +import java.util.ArrayDeque + +/** + * A [FlowMultiplexer] implementation that allocates inputs to the outputs of the multiplexer exclusively. This means + * that a single input is directly connected to an output and that the multiplexer can only support as many + * inputs as outputs. + * + * @param engine The [FlowEngine] driving the simulation. + */ +public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMultiplexer { + override val inputs: Set + get() = _inputs + private val _inputs = mutableSetOf() + + override val outputs: Set + get() = _outputs + private val _outputs = mutableSetOf() + private val _availableOutputs = ArrayDeque() + + override val counters: FlowCounters = object : FlowCounters { + override val demand: Double + get() = _outputs.sumOf { it.counters.demand } + override val actual: Double + get() = _outputs.sumOf { it.counters.actual } + override val overcommit: Double + get() = _outputs.sumOf { it.counters.overcommit } + override val interference: Double + get() = _outputs.sumOf { it.counters.interference } + + override fun reset() { + for (input in _outputs) { + input.counters.reset() + } + } + + override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" + } + + override fun newInput(key: InterferenceKey?): FlowConsumer { + val forwarder = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" } + val output = Input(forwarder) + _inputs += output + return output + } + + override fun removeInput(input: FlowConsumer) { + if (!_inputs.remove(input)) { + return + } + + (input as Input).close() + } + + override fun addOutput(output: FlowConsumer) { + if (output in outputs) { + return + } + + val forwarder = FlowForwarder(engine) + + _outputs += output + _availableOutputs += forwarder + + output.startConsumer(object : FlowSource by forwarder { + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + if (event == FlowEvent.Exit) { + // De-register the output after it has finished + _outputs -= output + } + + forwarder.onEvent(conn, now, event) + } + }) + } + + override fun clear() { + for (input in _outputs) { + input.cancel() + } + _outputs.clear() + + // Inputs are implicitly cancelled by the output forwarders + _inputs.clear() + } + + /** + * An input on the multiplexer. + */ + private inner class Input(private val forwarder: FlowForwarder) : FlowConsumer by forwarder { + /** + * Close the input. + */ + fun close() { + // We explicitly do not close the forwarder here in order to re-use it across input resources. + _inputs -= this + _availableOutputs += forwarder + } + + override fun toString(): String = "ForwardingFlowMultiplexer.Input" + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt new file mode 100644 index 00000000..9735f121 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -0,0 +1,399 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.mux + +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.interference.InterferenceDomain +import org.opendc.simulator.flow.interference.InterferenceKey +import org.opendc.simulator.flow.internal.FlowCountersImpl +import kotlin.math.max +import kotlin.math.min + +/** + * A [FlowMultiplexer] implementation that multiplexes flows over the available outputs using max-min fair sharing. + * + * @param engine The [FlowEngine] to drive the flow simulation. + * @param parent The parent flow system of the multiplexer. + * @param interferenceDomain The interference domain of the multiplexer. + */ +public class MaxMinFlowMultiplexer( + private val engine: FlowEngine, + private val parent: FlowSystem? = null, + private val interferenceDomain: InterferenceDomain? = null +) : FlowMultiplexer { + /** + * The inputs of the multiplexer. + */ + override val inputs: Set + get() = _inputs + private val _inputs = mutableSetOf() + private val _activeInputs = mutableListOf() + + /** + * The outputs of the multiplexer. + */ + override val outputs: Set + get() = _outputs + private val _outputs = mutableSetOf() + private val _activeOutputs = mutableListOf() + + /** + * The flow counters of this multiplexer. + */ + public override val counters: FlowCounters + get() = _counters + private val _counters = FlowCountersImpl() + + /** + * The actual processing rate of the multiplexer. + */ + private var _rate = 0.0 + + /** + * The demanded processing rate of the input. + */ + private var _demand = 0.0 + + /** + * The capacity of the outputs. + */ + private var _capacity = 0.0 + + /** + * Flag to indicate that the scheduler is active. + */ + private var _schedulerActive = false + + override fun newInput(key: InterferenceKey?): FlowConsumer { + val provider = Input(_capacity, key) + _inputs.add(provider) + return provider + } + + override fun addOutput(output: FlowConsumer) { + val consumer = Output(output) + if (_outputs.add(output)) { + _activeOutputs.add(consumer) + output.startConsumer(consumer) + } + } + + override fun removeInput(input: FlowConsumer) { + if (!_inputs.remove(input)) { + return + } + // This cast should always succeed since only `Input` instances should be added to `_inputs` + (input as Input).close() + } + + override fun clear() { + for (input in _activeOutputs) { + input.cancel() + } + _activeOutputs.clear() + + for (output in _activeInputs) { + output.cancel() + } + _activeInputs.clear() + } + + /** + * Converge the scheduler of the multiplexer. + */ + private fun runScheduler(now: Long) { + if (_schedulerActive) { + return + } + + _schedulerActive = true + try { + doSchedule(now) + } finally { + _schedulerActive = false + } + } + + /** + * Schedule the inputs over the outputs. + */ + private fun doSchedule(now: Long) { + val activeInputs = _activeInputs + val activeOutputs = _activeOutputs + + // If there is no work yet, mark the inputs as idle. + if (activeInputs.isEmpty()) { + return + } + + val capacity = _capacity + var availableCapacity = capacity + + // Pull in the work of the outputs + val inputIterator = activeInputs.listIterator() + for (input in inputIterator) { + input.pull(now) + + // Remove outputs that have finished + if (!input.isActive) { + inputIterator.remove() + } + } + + var demand = 0.0 + + // Sort in-place the inputs based on their pushed flow. + // Profiling shows that it is faster than maintaining some kind of sorted set. + activeInputs.sort() + + // Divide the available output capacity fairly over the inputs using max-min fair sharing + var remaining = activeInputs.size + for (input in activeInputs) { + val availableShare = availableCapacity / remaining-- + val grantedRate = min(input.allowedRate, availableShare) + + // Ignore empty sources + if (grantedRate <= 0.0) { + input.actualRate = 0.0 + continue + } + + input.actualRate = grantedRate + demand += input.limit + availableCapacity -= grantedRate + } + + val rate = capacity - availableCapacity + + _demand = demand + _rate = rate + + // Sort all consumers by their capacity + activeOutputs.sort() + + // Divide the requests over the available capacity of the input resources fairly + for (output in activeOutputs) { + val inputCapacity = output.capacity + val fraction = inputCapacity / capacity + val grantedSpeed = rate * fraction + + output.push(grantedSpeed) + } + } + + /** + * Recompute the capacity of the multiplexer. + */ + private fun updateCapacity() { + val newCapacity = _activeOutputs.sumOf(Output::capacity) + + // No-op if the capacity is unchanged + if (_capacity == newCapacity) { + return + } + + _capacity = newCapacity + + for (input in _inputs) { + input.capacity = newCapacity + } + } + + /** + * An internal [FlowConsumer] implementation for multiplexer inputs. + */ + private inner class Input(capacity: Double, val key: InterferenceKey?) : + AbstractFlowConsumer(engine, capacity), + FlowConsumerLogic, + Comparable { + /** + * The requested limit. + */ + @JvmField var limit: Double = 0.0 + + /** + * The actual processing speed. + */ + @JvmField var actualRate: Double = 0.0 + + /** + * The processing rate that is allowed by the model constraints. + */ + val allowedRate: Double + get() = min(capacity, limit) + + /** + * A flag to indicate that the input is closed. + */ + private var _isClosed: Boolean = false + + /** + * The timestamp at which we received the last command. + */ + private var _lastPull: Long = Long.MIN_VALUE + + /** + * Close the input. + * + * This method is invoked when the user removes an input from the switch. + */ + fun close() { + _isClosed = true + cancel() + } + + /* AbstractFlowConsumer */ + override fun createLogic(): FlowConsumerLogic = this + + override fun start(ctx: FlowConsumerContext) { + check(!_isClosed) { "Cannot re-use closed input" } + + _activeInputs += this + super.start(ctx) + } + + /* FlowConsumerLogic */ + override fun onPush( + ctx: FlowConsumerContext, + now: Long, + delta: Long, + rate: Double + ) { + doUpdateCounters(delta) + + actualRate = 0.0 + this.limit = rate + _lastPull = now + + runScheduler(now) + } + + override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { + parent?.onConverge(now) + } + + override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) { + doUpdateCounters(delta) + + limit = 0.0 + actualRate = 0.0 + _lastPull = now + } + + /* Comparable */ + override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate) + + /** + * Pull the source if necessary. + */ + fun pull(now: Long) { + val ctx = ctx + if (ctx != null && _lastPull < now) { + ctx.flush() + } + } + + /** + * Helper method to update the flow counters of the multiplexer. + */ + private fun doUpdateCounters(delta: Long) { + if (delta <= 0L) { + return + } + + // Compute the performance penalty due to flow interference + val perfScore = if (interferenceDomain != null) { + val load = _rate / capacity + interferenceDomain.apply(key, load) + } else { + 1.0 + } + + val deltaS = delta / 1000.0 + val work = limit * deltaS + val actualWork = actualRate * deltaS + val remainingWork = work - actualWork + + updateCounters(work, actualWork, remainingWork) + + val distCounters = _counters + distCounters.demand += work + distCounters.actual += actualWork + distCounters.overcommit += remainingWork + distCounters.interference += actualWork * max(0.0, 1 - perfScore) + } + } + + /** + * An internal [FlowSource] implementation for multiplexer outputs. + */ + private inner class Output(private val provider: FlowConsumer) : FlowSource, Comparable { + /** + * The active [FlowConnection] of this source. + */ + private var _ctx: FlowConnection? = null + + /** + * The capacity of this output. + */ + val capacity: Double + get() = _ctx?.capacity ?: 0.0 + + /** + * Push the specified rate to the consumer. + */ + fun push(rate: Double) { + _ctx?.push(rate) + } + + /** + * Cancel this output. + */ + fun cancel() { + provider.cancel() + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + runScheduler(now) + return Long.MAX_VALUE + } + + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + when (event) { + FlowEvent.Start -> { + assert(_ctx == null) { "Source running concurrently" } + _ctx = conn + updateCapacity() + } + FlowEvent.Exit -> { + _ctx = null + updateCapacity() + } + FlowEvent.Capacity -> updateCapacity() + else -> {} + } + } + + override fun compareTo(other: Output): Int = capacity.compareTo(other.capacity) + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt new file mode 100644 index 00000000..d9779c6a --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.source + +import org.opendc.simulator.flow.FlowConnection +import org.opendc.simulator.flow.FlowSource +import kotlin.math.roundToLong + +/** + * A [FlowSource] that contains a fixed [amount] and is pushed with a given [utilization]. + */ +public class FixedFlowSource(private val amount: Double, private val utilization: Double) : FlowSource { + + init { + require(amount >= 0.0) { "Amount must be positive" } + require(utilization > 0.0) { "Utilization must be positive" } + } + + private var remainingAmount = amount + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + val consumed = conn.rate * delta / 1000.0 + val limit = conn.capacity * utilization + + remainingAmount -= consumed + + val duration = (remainingAmount / limit * 1000).roundToLong() + + return if (duration > 0) { + conn.push(limit) + duration + } else { + conn.close() + Long.MAX_VALUE + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt new file mode 100644 index 00000000..b3191ad3 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.source + +/** + * The [FlowSourceBarrier] is a barrier that allows multiple sources to wait for a select number of other sources to + * finish a pull, before proceeding its operation. + */ +public class FlowSourceBarrier(public val parties: Int) { + private var counter = 0 + + /** + * Enter the barrier and determine whether the caller is the last to reach the barrier. + * + * @return `true` if the caller is the last to reach the barrier, `false` otherwise. + */ + public fun enter(): Boolean { + val last = ++counter == parties + if (last) { + counter = 0 + return true + } + return false + } + + /** + * Reset the barrier. + */ + public fun reset() { + counter = 0 + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt new file mode 100644 index 00000000..7fcc0405 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.source + +import org.opendc.simulator.flow.FlowConnection +import org.opendc.simulator.flow.FlowEvent +import org.opendc.simulator.flow.FlowSource +import kotlin.math.min + +/** + * Helper class to expose an observable [speed] field describing the speed of the consumer. + */ +public class FlowSourceRateAdapter( + private val delegate: FlowSource, + private val callback: (Double) -> Unit = {} +) : FlowSource by delegate { + /** + * The resource processing speed at this instant. + */ + public var speed: Double = 0.0 + private set(value) { + if (field != value) { + callback(value) + field = value + } + } + + init { + callback(0.0) + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return delegate.onPull(conn, now, delta) + } + + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + val oldSpeed = speed + + delegate.onEvent(conn, now, event) + + when (event) { + FlowEvent.Converge -> speed = conn.rate + FlowEvent.Capacity -> { + // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might + // need to update the current speed. + if (oldSpeed == speed) { + speed = min(conn.capacity, speed) + } + } + FlowEvent.Exit -> speed = 0.0 + else -> {} + } + } + + override fun onFailure(conn: FlowConnection, cause: Throwable) { + speed = 0.0 + + delegate.onFailure(conn, cause) + } + + override fun toString(): String = "FlowSourceRateAdapter[delegate=$delegate]" +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt new file mode 100644 index 00000000..4d3ae61a --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.source + +import org.opendc.simulator.flow.FlowConnection +import org.opendc.simulator.flow.FlowEvent +import org.opendc.simulator.flow.FlowSource + +/** + * A [FlowSource] that replays a sequence of [Fragment], each indicating the flow rate for some period of time. + */ +public class TraceFlowSource(private val trace: Sequence) : FlowSource { + private var _iterator: Iterator? = null + private var _nextTarget = Long.MIN_VALUE + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + // Check whether the trace fragment was fully consumed, otherwise wait until we have done so + val nextTarget = _nextTarget + if (nextTarget > now) { + return now - nextTarget + } + + val iterator = checkNotNull(_iterator) + return if (iterator.hasNext()) { + val fragment = iterator.next() + _nextTarget = now + fragment.duration + conn.push(fragment.usage) + fragment.duration + } else { + conn.close() + Long.MAX_VALUE + } + } + + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + when (event) { + FlowEvent.Start -> { + check(_iterator == null) { "Source already running" } + _iterator = trace.iterator() + } + FlowEvent.Exit -> { + _iterator = null + } + else -> {} + } + } + + /** + * A fragment of the tgrace. + */ + public data class Fragment(val duration: Long, val usage: Double) +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt new file mode 100644 index 00000000..061ebea6 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt @@ -0,0 +1,152 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +import io.mockk.* +import kotlinx.coroutines.* +import org.junit.jupiter.api.* +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.internal.FlowConsumerContextImpl +import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow.source.FixedFlowSource + +/** + * A test suite for the [FlowConsumerContextImpl] class. + */ +class FlowConsumerContextTest { + @Test + fun testFlushWithoutCommand() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val consumer = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return if (now == 0L) { + conn.push(1.0) + 1000 + } else { + conn.close() + Long.MAX_VALUE + } + } + } + + val logic = object : FlowConsumerLogic {} + val context = FlowConsumerContextImpl(engine, consumer, logic) + + engine.scheduleSync(engine.clock.millis(), context) + } + + @Test + fun testIntermediateFlush() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val consumer = FixedFlowSource(1.0, 1.0) + + val logic = spyk(object : FlowConsumerLogic {}) + val context = FlowConsumerContextImpl(engine, consumer, logic) + context.capacity = 1.0 + + context.start() + delay(1) // Delay 1 ms to prevent hitting the fast path + engine.scheduleSync(engine.clock.millis(), context) + + verify(exactly = 2) { logic.onPush(any(), any(), any(), any()) } + } + + @Test + fun testDoubleStart() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val consumer = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return if (now == 0L) { + conn.push(0.0) + 1000 + } else { + conn.close() + Long.MAX_VALUE + } + } + } + + val logic = object : FlowConsumerLogic {} + val context = FlowConsumerContextImpl(engine, consumer, logic) + + context.start() + + assertThrows { + context.start() + } + } + + @Test + fun testIdempotentCapacityChange() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val consumer = spyk(object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return if (now == 0L) { + conn.push(1.0) + 1000 + } else { + conn.close() + Long.MAX_VALUE + } + } + }) + + val logic = object : FlowConsumerLogic {} + val context = FlowConsumerContextImpl(engine, consumer, logic) + context.capacity = 4200.0 + context.start() + context.capacity = 4200.0 + + verify(exactly = 0) { consumer.onEvent(any(), any(), FlowEvent.Capacity) } + } + + @Test + fun testFailureNoInfiniteLoop() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val consumer = spyk(object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + conn.close() + return Long.MAX_VALUE + } + + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + if (event == FlowEvent.Exit) throw IllegalStateException("onEvent") + } + + override fun onFailure(conn: FlowConnection, cause: Throwable) { + throw IllegalStateException("onFailure") + } + }) + + val logic = object : FlowConsumerLogic {} + + val context = FlowConsumerContextImpl(engine, consumer, logic) + + context.start() + + delay(1) + + verify(exactly = 1) { consumer.onFailure(any(), any()) } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt new file mode 100644 index 00000000..cbc48a4e --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt @@ -0,0 +1,222 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +import io.mockk.spyk +import io.mockk.verify +import kotlinx.coroutines.* +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow.source.FixedFlowSource + +/** + * A test suite for the [FlowForwarder] class. + */ +internal class FlowForwarderTest { + @Test + fun testCancelImmediately() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 2000.0) + + launch { source.consume(forwarder) } + + forwarder.consume(object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + conn.close() + return Long.MAX_VALUE + } + }) + + forwarder.close() + source.cancel() + } + + @Test + fun testCancel() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 2000.0) + + launch { source.consume(forwarder) } + + forwarder.consume(object : FlowSource { + var isFirst = true + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return if (isFirst) { + isFirst = false + conn.push(1.0) + 10 * 1000 + } else { + conn.close() + Long.MAX_VALUE + } + } + }) + + forwarder.close() + source.cancel() + } + + @Test + fun testState() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val consumer = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + conn.close() + return Long.MAX_VALUE + } + } + + assertFalse(forwarder.isActive) + + forwarder.startConsumer(consumer) + assertTrue(forwarder.isActive) + + assertThrows { forwarder.startConsumer(consumer) } + + forwarder.cancel() + assertFalse(forwarder.isActive) + + forwarder.close() + assertFalse(forwarder.isActive) + } + + @Test + fun testCancelPendingDelegate() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + + val consumer = spyk(object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + conn.close() + return Long.MAX_VALUE + } + }) + + forwarder.startConsumer(consumer) + forwarder.cancel() + + verify(exactly = 0) { consumer.onEvent(any(), any(), FlowEvent.Exit) } + } + + @Test + fun testCancelStartedDelegate() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 2000.0) + + val consumer = spyk(FixedFlowSource(2000.0, 1.0)) + + source.startConsumer(forwarder) + yield() + forwarder.startConsumer(consumer) + yield() + forwarder.cancel() + + verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Start) } + verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Exit) } + } + + @Test + fun testCancelPropagation() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 2000.0) + + val consumer = spyk(FixedFlowSource(2000.0, 1.0)) + + source.startConsumer(forwarder) + yield() + forwarder.startConsumer(consumer) + yield() + source.cancel() + + verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Start) } + verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Exit) } + } + + @Test + fun testExitPropagation() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine, isCoupled = true) + val source = FlowSink(engine, 2000.0) + + val consumer = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + conn.close() + return Long.MAX_VALUE + } + } + + source.startConsumer(forwarder) + forwarder.consume(consumer) + yield() + + assertFalse(forwarder.isActive) + } + + @Test + fun testAdjustCapacity() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 1.0) + + val consumer = spyk(FixedFlowSource(2.0, 1.0)) + source.startConsumer(forwarder) + + coroutineScope { + launch { forwarder.consume(consumer) } + delay(1000) + source.capacity = 0.5 + } + + assertEquals(3000, clock.millis()) + verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Capacity) } + } + + @Test + fun testCounters() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 1.0) + + val consumer = FixedFlowSource(2.0, 1.0) + source.startConsumer(forwarder) + + forwarder.consume(consumer) + + yield() + + assertEquals(2.0, source.counters.actual) + assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } + assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } + assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" } + assertEquals(2000, clock.millis()) + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt new file mode 100644 index 00000000..010a985e --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt @@ -0,0 +1,240 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +import io.mockk.every +import io.mockk.mockk +import io.mockk.spyk +import io.mockk.verify +import kotlinx.coroutines.* +import org.junit.jupiter.api.* +import org.junit.jupiter.api.Assertions.assertEquals +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow.source.FixedFlowSource +import org.opendc.simulator.flow.source.FlowSourceRateAdapter + +/** + * A test suite for the [FlowSink] class. + */ +internal class FlowSinkTest { + @Test + fun testSpeed() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val capacity = 4200.0 + val provider = FlowSink(engine, capacity) + + val consumer = FixedFlowSource(4200.0, 1.0) + + val res = mutableListOf() + val adapter = FlowSourceRateAdapter(consumer, res::add) + + provider.consume(adapter) + + assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } + } + + @Test + fun testAdjustCapacity() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val provider = FlowSink(engine, 1.0) + + val consumer = spyk(FixedFlowSource(2.0, 1.0)) + + coroutineScope { + launch { provider.consume(consumer) } + delay(1000) + provider.capacity = 0.5 + } + assertEquals(3000, clock.millis()) + verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Capacity) } + } + + @Test + fun testSpeedLimit() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val capacity = 4200.0 + val provider = FlowSink(engine, capacity) + + val consumer = FixedFlowSource(capacity, 2.0) + + val res = mutableListOf() + val adapter = FlowSourceRateAdapter(consumer, res::add) + + provider.consume(adapter) + + assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } + } + + /** + * Test to see whether no infinite recursion occurs when interrupting during [FlowSource.onStart] or + * [FlowSource.onPull]. + */ + @Test + fun testIntermediateInterrupt() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val capacity = 4200.0 + val provider = FlowSink(engine, capacity) + + val consumer = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + conn.close() + return Long.MAX_VALUE + } + + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + conn.pull() + } + } + + provider.consume(consumer) + } + + @Test + fun testInterrupt() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val capacity = 4200.0 + val provider = FlowSink(engine, capacity) + lateinit var resCtx: FlowConnection + + val consumer = object : FlowSource { + var isFirst = true + + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + when (event) { + FlowEvent.Start -> resCtx = conn + else -> {} + } + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return if (isFirst) { + isFirst = false + conn.push(1.0) + 4000 + } else { + conn.close() + Long.MAX_VALUE + } + } + } + + launch { + yield() + resCtx.pull() + } + provider.consume(consumer) + + assertEquals(0, clock.millis()) + } + + @Test + fun testFailure() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val capacity = 4200.0 + val provider = FlowSink(engine, capacity) + + val consumer = mockk(relaxUnitFun = true) + every { consumer.onEvent(any(), any(), eq(FlowEvent.Start)) } + .throws(IllegalStateException()) + + assertThrows { + provider.consume(consumer) + } + } + + @Test + fun testExceptionPropagationOnNext() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val capacity = 4200.0 + val provider = FlowSink(engine, capacity) + + val consumer = object : FlowSource { + var isFirst = true + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return if (isFirst) { + isFirst = false + conn.push(1.0) + 1000 + } else { + throw IllegalStateException() + } + } + } + + assertThrows { + provider.consume(consumer) + } + } + + @Test + fun testConcurrentConsumption() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val capacity = 4200.0 + val provider = FlowSink(engine, capacity) + + val consumer = FixedFlowSource(capacity, 1.0) + + assertThrows { + coroutineScope { + launch { provider.consume(consumer) } + provider.consume(consumer) + } + } + } + + @Test + fun testCancelDuringConsumption() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val capacity = 4200.0 + val provider = FlowSink(engine, capacity) + + val consumer = FixedFlowSource(capacity, 1.0) + + launch { provider.consume(consumer) } + delay(500) + provider.cancel() + + yield() + + assertEquals(500, clock.millis()) + } + + @Test + fun testInfiniteSleep() { + assertThrows { + runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val capacity = 4200.0 + val provider = FlowSink(engine, capacity) + + val consumer = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long = Long.MAX_VALUE + } + + provider.consume(consumer) + } + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt new file mode 100644 index 00000000..b503087e --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.mux + +import kotlinx.coroutines.yield +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow.source.FixedFlowSource +import org.opendc.simulator.flow.source.FlowSourceRateAdapter +import org.opendc.simulator.flow.source.TraceFlowSource + +/** + * Test suite for the [ForwardingFlowMultiplexer] class. + */ +internal class SimResourceSwitchExclusiveTest { + /** + * Test a trace workload. + */ + @Test + fun testTrace() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val speed = mutableListOf() + + val duration = 5 * 60L + val workload = + TraceFlowSource( + sequenceOf( + TraceFlowSource.Fragment(duration * 1000, 28.0), + TraceFlowSource.Fragment(duration * 1000, 3500.0), + TraceFlowSource.Fragment(duration * 1000, 0.0), + TraceFlowSource.Fragment(duration * 1000, 183.0) + ), + ) + + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) + val forwarder = FlowForwarder(engine) + val adapter = FlowSourceRateAdapter(forwarder, speed::add) + source.startConsumer(adapter) + switch.addOutput(forwarder) + + val provider = switch.newInput() + provider.consume(workload) + yield() + + assertAll( + { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } }, + { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } } + ) + } + + /** + * Test runtime workload on hypervisor. + */ + @Test + fun testRuntimeWorkload() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val duration = 5 * 60L * 1000 + val workload = FixedFlowSource(duration * 3.2, 1.0) + + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) + + switch.addOutput(source) + + val provider = switch.newInput() + provider.consume(workload) + yield() + + assertEquals(duration, clock.millis()) { "Took enough time" } + } + + /** + * Test two workloads running sequentially. + */ + @Test + fun testTwoWorkloads() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val duration = 5 * 60L * 1000 + val workload = object : FlowSource { + var isFirst = true + + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + when (event) { + FlowEvent.Start -> isFirst = true + else -> {} + } + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return if (isFirst) { + isFirst = false + conn.push(1.0) + duration + } else { + conn.close() + Long.MAX_VALUE + } + } + } + + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) + + switch.addOutput(source) + + val provider = switch.newInput() + provider.consume(workload) + yield() + provider.consume(workload) + assertEquals(duration * 2, clock.millis()) { "Took enough time" } + } + + /** + * Test concurrent workloads on the machine. + */ + @Test + fun testConcurrentWorkloadFails() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) + + switch.addOutput(source) + + switch.newInput() + assertThrows { switch.newInput() } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt new file mode 100644 index 00000000..089a8d78 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.mux + +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch +import kotlinx.coroutines.yield +import org.junit.jupiter.api.* +import org.junit.jupiter.api.Assertions.assertEquals +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.FlowSink +import org.opendc.simulator.flow.consume +import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow.source.FixedFlowSource +import org.opendc.simulator.flow.source.TraceFlowSource + +/** + * Test suite for the [FlowMultiplexer] implementations + */ +internal class SimResourceSwitchMaxMinTest { + @Test + fun testSmoke() = runBlockingSimulation { + val scheduler = FlowEngineImpl(coroutineContext, clock) + val switch = MaxMinFlowMultiplexer(scheduler) + + val sources = List(2) { FlowSink(scheduler, 2000.0) } + sources.forEach { switch.addOutput(it) } + + val provider = switch.newInput() + val consumer = FixedFlowSource(2000.0, 1.0) + + try { + provider.consume(consumer) + yield() + } finally { + switch.clear() + } + } + + /** + * Test overcommitting of resources via the hypervisor with a single VM. + */ + @Test + fun testOvercommittedSingle() = runBlockingSimulation { + val scheduler = FlowEngineImpl(coroutineContext, clock) + + val duration = 5 * 60L + val workload = + TraceFlowSource( + sequenceOf( + TraceFlowSource.Fragment(duration * 1000, 28.0), + TraceFlowSource.Fragment(duration * 1000, 3500.0), + TraceFlowSource.Fragment(duration * 1000, 0.0), + TraceFlowSource.Fragment(duration * 1000, 183.0) + ), + ) + + val switch = MaxMinFlowMultiplexer(scheduler) + val provider = switch.newInput() + + try { + switch.addOutput(FlowSink(scheduler, 3200.0)) + provider.consume(workload) + yield() + } finally { + switch.clear() + } + + assertAll( + { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") }, + { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") }, + { assertEquals(90000.0, switch.counters.overcommit, "Overcommitted work does not match") }, + { assertEquals(1200000, clock.millis()) } + ) + } + + /** + * Test overcommitting of resources via the hypervisor with two VMs. + */ + @Test + fun testOvercommittedDual() = runBlockingSimulation { + val scheduler = FlowEngineImpl(coroutineContext, clock) + + val duration = 5 * 60L + val workloadA = + TraceFlowSource( + sequenceOf( + TraceFlowSource.Fragment(duration * 1000, 28.0), + TraceFlowSource.Fragment(duration * 1000, 3500.0), + TraceFlowSource.Fragment(duration * 1000, 0.0), + TraceFlowSource.Fragment(duration * 1000, 183.0) + ), + ) + val workloadB = + TraceFlowSource( + sequenceOf( + TraceFlowSource.Fragment(duration * 1000, 28.0), + TraceFlowSource.Fragment(duration * 1000, 3100.0), + TraceFlowSource.Fragment(duration * 1000, 0.0), + TraceFlowSource.Fragment(duration * 1000, 73.0) + ) + ) + + val switch = MaxMinFlowMultiplexer(scheduler) + val providerA = switch.newInput() + val providerB = switch.newInput() + + try { + switch.addOutput(FlowSink(scheduler, 3200.0)) + + coroutineScope { + launch { providerA.consume(workloadA) } + providerB.consume(workloadB) + } + + yield() + } finally { + switch.clear() + } + assertAll( + { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") }, + { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") }, + { assertEquals(1020000.0, switch.counters.overcommit, "Overcommitted work does not match") }, + { assertEquals(1200000, clock.millis()) } + ) + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt new file mode 100644 index 00000000..8396d346 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.source + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.FlowSink +import org.opendc.simulator.flow.consume +import org.opendc.simulator.flow.internal.FlowEngineImpl + +/** + * A test suite for the [FixedFlowSource] class. + */ +internal class FixedFlowSourceTest { + @Test + fun testSmoke() = runBlockingSimulation { + val scheduler = FlowEngineImpl(coroutineContext, clock) + val provider = FlowSink(scheduler, 1.0) + + val consumer = FixedFlowSource(1.0, 1.0) + + provider.consume(consumer) + assertEquals(1000, clock.millis()) + } + + @Test + fun testUtilization() = runBlockingSimulation { + val scheduler = FlowEngineImpl(coroutineContext, clock) + val provider = FlowSink(scheduler, 1.0) + + val consumer = FixedFlowSource(1.0, 0.5) + + provider.consume(consumer) + assertEquals(2000, clock.millis()) + } +} -- cgit v1.2.3