From 4cc1d40d421c8736f8b21b360b61d6b065158b7a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 29 Sep 2021 23:56:16 +0200 Subject: refactor(simulator): Migrate to flow-based simulation This change renames the `opendc-simulator-resources` module into the `opendc-simulator-flow` module to indicate that the core simulation model of OpenDC is based around modelling and simulating flows. Previously, the distinction between resource consumer and provider, and input and output caused some confusion. By switching to a flow-based model, this distinction is now clear (as in, the water flows from source to consumer/sink). --- .../org/opendc/simulator/flow/FlowBenchmarks.kt | 140 ++++++++ .../opendc/simulator/flow/AbstractFlowConsumer.kt | 143 ++++++++ .../org/opendc/simulator/flow/FlowConnection.kt | 60 ++++ .../org/opendc/simulator/flow/FlowConsumer.kt | 109 ++++++ .../opendc/simulator/flow/FlowConsumerContext.kt | 45 +++ .../org/opendc/simulator/flow/FlowConsumerLogic.kt | 56 +++ .../org/opendc/simulator/flow/FlowCounters.kt | 53 +++ .../kotlin/org/opendc/simulator/flow/FlowEngine.kt | 95 +++++ .../kotlin/org/opendc/simulator/flow/FlowEvent.kt | 48 +++ .../org/opendc/simulator/flow/FlowForwarder.kt | 217 +++++++++++ .../kotlin/org/opendc/simulator/flow/FlowSink.kt | 61 ++++ .../kotlin/org/opendc/simulator/flow/FlowSource.kt | 58 +++ .../kotlin/org/opendc/simulator/flow/FlowSystem.kt | 43 +++ .../flow/interference/InterferenceDomain.kt | 19 + .../simulator/flow/interference/InterferenceKey.kt | 28 ++ .../flow/internal/FlowConsumerContextImpl.kt | 356 ++++++++++++++++++ .../simulator/flow/internal/FlowCountersImpl.kt | 46 +++ .../simulator/flow/internal/FlowEngineImpl.kt | 297 +++++++++++++++ .../opendc/simulator/flow/mux/FlowMultiplexer.kt | 70 ++++ .../flow/mux/ForwardingFlowMultiplexer.kt | 127 +++++++ .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 399 +++++++++++++++++++++ .../simulator/flow/source/FixedFlowSource.kt | 57 +++ .../simulator/flow/source/FlowSourceBarrier.kt | 52 +++ .../simulator/flow/source/FlowSourceRateAdapter.kt | 82 +++++ .../simulator/flow/source/TraceFlowSource.kt | 72 ++++ .../simulator/flow/FlowConsumerContextTest.kt | 152 ++++++++ .../org/opendc/simulator/flow/FlowForwarderTest.kt | 222 ++++++++++++ .../org/opendc/simulator/flow/FlowSinkTest.kt | 240 +++++++++++++ .../flow/mux/SimResourceSwitchExclusiveTest.kt | 157 ++++++++ .../flow/mux/SimResourceSwitchMaxMinTest.kt | 147 ++++++++ .../simulator/flow/source/FixedFlowSourceTest.kt | 57 +++ 31 files changed, 3708 insertions(+) create mode 100644 opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt new file mode 100644 index 00000000..4834f10f --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.launch +import org.opendc.simulator.core.SimulationCoroutineScope +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer +import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer +import org.opendc.simulator.flow.source.TraceFlowSource +import org.openjdk.jmh.annotations.* +import java.util.concurrent.ThreadLocalRandom +import java.util.concurrent.TimeUnit + +@State(Scope.Thread) +@Fork(1) +@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) +@OptIn(ExperimentalCoroutinesApi::class) +class FlowBenchmarks { + private lateinit var scope: SimulationCoroutineScope + private lateinit var engine: FlowEngine + + @Setup + fun setUp() { + scope = SimulationCoroutineScope() + engine = FlowEngine(scope.coroutineContext, scope.clock) + } + + @State(Scope.Thread) + class Workload { + lateinit var trace: Sequence + + @Setup + fun setUp() { + val random = ThreadLocalRandom.current() + val entries = List(10000) { TraceFlowSource.Fragment(1000, random.nextDouble(0.0, 4500.0)) } + trace = entries.asSequence() + } + } + + @Benchmark + fun benchmarkSink(state: Workload) { + return scope.runBlockingSimulation { + val provider = FlowSink(engine, 4200.0) + return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) + } + } + + @Benchmark + fun benchmarkForward(state: Workload) { + return scope.runBlockingSimulation { + val provider = FlowSink(engine, 4200.0) + val forwarder = FlowForwarder(engine) + provider.startConsumer(forwarder) + return@runBlockingSimulation forwarder.consume(TraceFlowSource(state.trace)) + } + } + + @Benchmark + fun benchmarkMuxMaxMinSingleSource(state: Workload) { + return scope.runBlockingSimulation { + val switch = MaxMinFlowMultiplexer(engine) + + switch.addOutput(FlowSink(engine, 3000.0)) + switch.addOutput(FlowSink(engine, 3000.0)) + + val provider = switch.newInput() + return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) + } + } + + @Benchmark + fun benchmarkMuxMaxMinTripleSource(state: Workload) { + return scope.runBlockingSimulation { + val switch = MaxMinFlowMultiplexer(engine) + + switch.addOutput(FlowSink(engine, 3000.0)) + switch.addOutput(FlowSink(engine, 3000.0)) + + repeat(3) { + launch { + val provider = switch.newInput() + provider.consume(TraceFlowSource(state.trace)) + } + } + } + } + + @Benchmark + fun benchmarkMuxExclusiveSingleSource(state: Workload) { + return scope.runBlockingSimulation { + val switch = ForwardingFlowMultiplexer(engine) + + switch.addOutput(FlowSink(engine, 3000.0)) + switch.addOutput(FlowSink(engine, 3000.0)) + + val provider = switch.newInput() + return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) + } + } + + @Benchmark + fun benchmarkMuxExclusiveTripleSource(state: Workload) { + return scope.runBlockingSimulation { + val switch = ForwardingFlowMultiplexer(engine) + + switch.addOutput(FlowSink(engine, 3000.0)) + switch.addOutput(FlowSink(engine, 3000.0)) + + repeat(2) { + launch { + val provider = switch.newInput() + provider.consume(TraceFlowSource(state.trace)) + } + } + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt new file mode 100644 index 00000000..c8092082 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt @@ -0,0 +1,143 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +import org.opendc.simulator.flow.internal.FlowCountersImpl + +/** + * Abstract implementation of the [FlowConsumer] which can be re-used by other implementations. + */ +public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initialCapacity: Double) : FlowConsumer { + /** + * A flag to indicate that the flow consumer is active. + */ + public override val isActive: Boolean + get() = ctx != null + + /** + * The capacity of the consumer. + */ + public override var capacity: Double = initialCapacity + set(value) { + field = value + ctx?.capacity = value + } + + /** + * The current processing rate of the consumer. + */ + public override val rate: Double + get() = ctx?.rate ?: 0.0 + + /** + * The flow processing rate demand at this instant. + */ + public override val demand: Double + get() = ctx?.demand ?: 0.0 + + /** + * The flow counters to track the flow metrics of the consumer. + */ + public override val counters: FlowCounters + get() = _counters + private val _counters = FlowCountersImpl() + + /** + * The [FlowConsumerContext] that is currently running. + */ + protected var ctx: FlowConsumerContext? = null + private set + + /** + * Construct the [FlowConsumerLogic] instance for a new source. + */ + protected abstract fun createLogic(): FlowConsumerLogic + + /** + * Start the specified [FlowConsumerContext]. + */ + protected open fun start(ctx: FlowConsumerContext) { + ctx.start() + } + + /** + * The previous demand for the consumer. + */ + private var previousDemand = 0.0 + + /** + * Update the counters of the flow consumer. + */ + protected fun updateCounters(ctx: FlowConnection, delta: Long) { + val demand = previousDemand + previousDemand = ctx.demand + + if (delta <= 0) { + return + } + + val counters = _counters + val deltaS = delta / 1000.0 + val work = demand * deltaS + val actualWork = ctx.rate * deltaS + val remainingWork = work - actualWork + + counters.demand += work + counters.actual += actualWork + counters.overcommit += remainingWork + } + + /** + * Update the counters of the flow consumer. + */ + protected fun updateCounters(demand: Double, actual: Double, overcommit: Double) { + val counters = _counters + counters.demand += demand + counters.actual += actual + counters.overcommit += overcommit + } + + final override fun startConsumer(source: FlowSource) { + check(ctx == null) { "Consumer is in invalid state" } + val ctx = engine.newContext(source, createLogic()) + + ctx.capacity = capacity + this.ctx = ctx + + start(ctx) + } + + final override fun pull() { + ctx?.pull() + } + + final override fun cancel() { + val ctx = ctx + if (ctx != null) { + this.ctx = null + ctx.close() + } + } + + override fun toString(): String = "AbstractFlowConsumer[capacity=$capacity]" +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt new file mode 100644 index 00000000..fa833961 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +/** + * An active connection between a [FlowSource] and [FlowConsumer]. + */ +public interface FlowConnection : AutoCloseable { + /** + * The capacity of the connection. + */ + public val capacity: Double + + /** + * The flow rate over the connection. + */ + public val rate: Double + + /** + * The flow demand of the source. + */ + public val demand: Double + + /** + * Pull the source. + */ + public fun pull() + + /** + * Push the given flow [rate] over this connection. + * + * @param rate The rate of the flow to push. + */ + public fun push(rate: Double) + + /** + * Disconnect the consumer from its source. + */ + public override fun close() +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt new file mode 100644 index 00000000..3a6e2e97 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException + +/** + * A consumer of a [FlowSource]. + */ +public interface FlowConsumer { + /** + * A flag to indicate that the consumer is currently consuming a [FlowSource]. + */ + public val isActive: Boolean + + /** + * The flow capacity of this consumer. + */ + public val capacity: Double + + /** + * The current flow rate of the consumer. + */ + public val rate: Double + + /** + * The current flow demand. + */ + public val demand: Double + + /** + * The flow counters to track the flow metrics of the consumer. + */ + public val counters: FlowCounters + + /** + * Start consuming the specified [source]. + * + * @throws IllegalStateException if the consumer is already active. + */ + public fun startConsumer(source: FlowSource) + + /** + * Ask the consumer to pull its source. + * + * If the consumer is not active, this operation will be a no-op. + */ + public fun pull() + + /** + * Disconnect the consumer from its source. + * + * If the consumer is not active, this operation will be a no-op. + */ + public fun cancel() +} + +/** + * Consume the specified [source] and suspend execution until the source is fully consumed or failed. + */ +public suspend fun FlowConsumer.consume(source: FlowSource) { + return suspendCancellableCoroutine { cont -> + startConsumer(object : FlowSource by source { + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + source.onEvent(conn, now, event) + + if (event == FlowEvent.Exit && !cont.isCompleted) { + cont.resume(Unit) + } + } + + override fun onFailure(conn: FlowConnection, cause: Throwable) { + try { + source.onFailure(conn, cause) + cont.resumeWithException(cause) + } catch (e: Throwable) { + e.addSuppressed(cause) + cont.resumeWithException(e) + } + } + + override fun toString(): String = "SuspendingFlowSource" + }) + + cont.invokeOnCancellation { cancel() } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt new file mode 100644 index 00000000..75b2d25b --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +/** + * A controllable [FlowConnection]. + * + * This interface is used by [FlowConsumer]s to control the connection between it and the source. + */ +public interface FlowConsumerContext : FlowConnection { + /** + * The capacity of the connection. + */ + public override var capacity: Double + + /** + * Start the flow over the connection. + */ + public fun start() + + /** + * Synchronously flush the changes of the connection. + */ + public fun flush() +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt new file mode 100644 index 00000000..c69cb17e --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +/** + * A collection of callbacks associated with a [FlowConsumer]. + */ +public interface FlowConsumerLogic { + /** + * This method is invoked when a [FlowSource] changes the rate of flow to this consumer. + * + * @param ctx The context in which the provider runs. + * @param now The virtual timestamp in milliseconds at which the update is occurring. + * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds. + * @param rate The requested processing rate of the source. + */ + public fun onPush(ctx: FlowConsumerContext, now: Long, delta: Long, rate: Double) {} + + /** + * This method is invoked when the flow graph has converged into a steady-state system. + * + * @param ctx The context in which the provider runs. + * @param now The virtual timestamp in milliseconds at which the system converged. + * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. + */ + public fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {} + + /** + * This method is invoked when the [FlowSource] is completed. + * + * @param ctx The context in which the provider runs. + * @param now The virtual timestamp in milliseconds at which the provider finished. + * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds. + */ + public fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) {} +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt new file mode 100644 index 00000000..e15d7643 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +/** + * An interface that tracks cumulative counts of the flow accumulation over a stage. + */ +public interface FlowCounters { + /** + * The accumulated flow that a source wanted to push over the connection. + */ + public val demand: Double + + /** + * The accumulated flow that was actually transferred over the connection. + */ + public val actual: Double + + /** + * The accumulated flow that could not be transferred over the connection. + */ + public val overcommit: Double + + /** + * The accumulated flow lost due to interference between sources. + */ + public val interference: Double + + /** + * Reset the flow counters. + */ + public fun reset() +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt new file mode 100644 index 00000000..65224827 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +import org.opendc.simulator.flow.internal.FlowEngineImpl +import java.time.Clock +import kotlin.coroutines.CoroutineContext + +/** + * A [FlowEngine] is responsible for managing the interaction between [FlowSource]s and [FlowConsumer]s. + * + * The engine centralizes the scheduling logic of state updates of flow connections, allowing update propagation + * to happen more efficiently. and overall, reducing the work necessary to transition into a steady state. + */ +public interface FlowEngine { + /** + * The virtual [Clock] associated with this engine. + */ + public val clock: Clock + + /** + * Create a new [FlowConsumerContext] with the given [provider]. + * + * @param consumer The consumer logic. + * @param provider The logic of the resource provider. + */ + public fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext + + /** + * Start batching the execution of resource updates until [popBatch] is called. + * + * This method is useful if you want to propagate multiple resources updates (e.g., starting multiple CPUs + * simultaneously) in a single state update. + * + * Multiple calls to this method requires the same number of [popBatch] calls in order to properly flush the + * resource updates. This allows nested calls to [pushBatch], but might cause issues if [popBatch] is not called + * the same amount of times. To simplify batching, see [batch]. + */ + public fun pushBatch() + + /** + * Stop the batching of resource updates and run the interpreter on the batch. + * + * Note that method will only flush the event once the first call to [pushBatch] has received a [popBatch] call. + */ + public fun popBatch() + + public companion object { + /** + * Construct a new [FlowEngine] implementation. + * + * @param context The coroutine context to use. + * @param clock The virtual simulation clock. + */ + @JvmStatic + @JvmName("create") + public operator fun invoke(context: CoroutineContext, clock: Clock): FlowEngine { + return FlowEngineImpl(context, clock) + } + } +} + +/** + * Batch the execution of several interrupts into a single call. + * + * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update. + */ +public inline fun FlowEngine.batch(block: () -> Unit) { + try { + pushBatch() + block() + } finally { + popBatch() + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt new file mode 100644 index 00000000..14c85183 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +/** + * A flow event that is communicated to a [FlowSource]. + */ +public enum class FlowEvent { + /** + * This event is emitted to the source when it has started. + */ + Start, + + /** + * This event is emitted to the source when it is stopped. + */ + Exit, + + /** + * This event is emitted to the source when the system has converged into a steady state. + */ + Converge, + + /** + * This event is emitted to the source when the capacity of the consumer has changed. + */ + Capacity, +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt new file mode 100644 index 00000000..2074033e --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -0,0 +1,217 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +import org.opendc.simulator.flow.internal.FlowCountersImpl + +/** + * A class that acts as a [FlowSource] and [FlowConsumer] at the same time. + * + * @param engine The [FlowEngine] the forwarder runs in. + * @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits. + */ +public class FlowForwarder(private val engine: FlowEngine, private val isCoupled: Boolean = false) : FlowSource, FlowConsumer, AutoCloseable { + /** + * The delegate [FlowSource]. + */ + private var delegate: FlowSource? = null + + /** + * A flag to indicate that the delegate was started. + */ + private var hasDelegateStarted: Boolean = false + + /** + * The exposed [FlowConnection]. + */ + private val _ctx = object : FlowConnection { + override val capacity: Double + get() = _innerCtx?.capacity ?: 0.0 + + override val demand: Double + get() = _innerCtx?.demand ?: 0.0 + + override val rate: Double + get() = _innerCtx?.rate ?: 0.0 + + override fun pull() { + _innerCtx?.pull() + } + + override fun push(rate: Double) { + _innerCtx?.push(rate) + _demand = rate + } + + override fun close() { + val delegate = checkNotNull(delegate) { "Delegate not active" } + + if (isCoupled) + _innerCtx?.close() + else + _innerCtx?.push(0.0) + + // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we + // reset beforehand the existing state and check whether it has been updated afterwards + reset() + + delegate.onEvent(this, engine.clock.millis(), FlowEvent.Exit) + } + } + + /** + * The [FlowConnection] in which the forwarder runs. + */ + private var _innerCtx: FlowConnection? = null + + override val isActive: Boolean + get() = delegate != null + + override val capacity: Double + get() = _ctx.capacity + + override val rate: Double + get() = _ctx.rate + + override val demand: Double + get() = _ctx.demand + + override val counters: FlowCounters + get() = _counters + private val _counters = FlowCountersImpl() + + override fun startConsumer(source: FlowSource) { + check(delegate == null) { "Forwarder already active" } + + delegate = source + + // Pull to replace the source + pull() + } + + override fun pull() { + _ctx.pull() + } + + override fun cancel() { + val delegate = delegate + val ctx = _innerCtx + + if (delegate != null) { + this.delegate = null + + if (ctx != null) { + delegate.onEvent(this._ctx, engine.clock.millis(), FlowEvent.Exit) + } + } + } + + override fun close() { + val ctx = _innerCtx + + if (ctx != null) { + this._innerCtx = null + ctx.pull() + } + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + val delegate = delegate + + if (!hasDelegateStarted) { + start() + } + + updateCounters(conn, delta) + + return delegate?.onPull(this._ctx, now, delta) ?: Long.MAX_VALUE + } + + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + when (event) { + FlowEvent.Start -> { + _innerCtx = conn + } + FlowEvent.Exit -> { + _innerCtx = null + + val delegate = delegate + if (delegate != null) { + reset() + delegate.onEvent(this._ctx, now, FlowEvent.Exit) + } + } + else -> delegate?.onEvent(this._ctx, now, event) + } + } + + override fun onFailure(conn: FlowConnection, cause: Throwable) { + _innerCtx = null + + val delegate = delegate + if (delegate != null) { + reset() + delegate.onFailure(this._ctx, cause) + } + } + + /** + * Start the delegate. + */ + private fun start() { + val delegate = delegate ?: return + delegate.onEvent(checkNotNull(_innerCtx), engine.clock.millis(), FlowEvent.Start) + + hasDelegateStarted = true + } + + /** + * Reset the delegate. + */ + private fun reset() { + delegate = null + hasDelegateStarted = false + } + + /** + * The requested flow rate. + */ + private var _demand: Double = 0.0 + + /** + * Update the flow counters for the transformer. + */ + private fun updateCounters(ctx: FlowConnection, delta: Long) { + if (delta <= 0) { + return + } + + val counters = _counters + val deltaS = delta / 1000.0 + val work = _demand * deltaS + val actualWork = ctx.rate * deltaS + counters.demand += work + counters.actual += actualWork + counters.overcommit += (work - actualWork) + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt new file mode 100644 index 00000000..fb6ca85d --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +/** + * A [FlowSink] represents a sink with a fixed capacity. + * + * @param initialCapacity The initial capacity of the resource. + * @param engine The engine that is used for driving the flow simulation. + * @param parent The parent flow system. + */ +public class FlowSink( + private val engine: FlowEngine, + initialCapacity: Double, + private val parent: FlowSystem? = null +) : AbstractFlowConsumer(engine, initialCapacity) { + + override fun createLogic(): FlowConsumerLogic { + return object : FlowConsumerLogic { + override fun onPush( + ctx: FlowConsumerContext, + now: Long, + delta: Long, + rate: Double + ) { + updateCounters(ctx, delta) + } + + override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) { + updateCounters(ctx, delta) + cancel() + } + + override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { + parent?.onConverge(now) + } + } + } + + override fun toString(): String = "FlowSink[capacity=$capacity]" +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt new file mode 100644 index 00000000..077b4d38 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +/** + * A source of flow that is consumed by a [FlowConsumer]. + * + * Implementations of this interface should be considered stateful and must be assumed not to be re-usable + * (concurrently) for multiple [FlowConsumer]s, unless explicitly said otherwise. + */ +public interface FlowSource { + /** + * This method is invoked when the source is pulled. + * + * @param conn The connection between the source and consumer. + * @param now The virtual timestamp in milliseconds at which the pull is occurring. + * @param delta The virtual duration between this call and the last call to [onPull] in milliseconds. + * @return The duration after which the resource consumer should be pulled again. + */ + public fun onPull(conn: FlowConnection, now: Long, delta: Long): Long + + /** + * This method is invoked when an event has occurred. + * + * @param conn The connection between the source and consumer. + * @param now The virtual timestamp in milliseconds at which the event is occurring. + * @param event The event that has occurred. + */ + public fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {} + + /** + * This method is invoked when the source throws an exception. + * + * @param conn The connection between the source and consumer. + * @param cause The cause of the failure. + */ + public fun onFailure(conn: FlowConnection, cause: Throwable) {} +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt new file mode 100644 index 00000000..db6aa69f --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +/** + * A system of possible multiple sub-resources. + * + * This interface is used to model hierarchies of resource providers, which can listen efficiently to changes of the + * resource provider. + */ +public interface FlowSystem { + /** + * The parent system to which this system belongs or `null` if it has no parent. + */ + public val parent: FlowSystem? + + /** + * This method is invoked when the system has converged to a steady-state. + * + * @param timestamp The timestamp at which the system converged. + */ + public fun onConverge(timestamp: Long) +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt new file mode 100644 index 00000000..aa2713b6 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt @@ -0,0 +1,19 @@ +package org.opendc.simulator.flow.interference + +import org.opendc.simulator.flow.FlowSource + +/** + * An interference domain represents a system of flow stages where [flow sources][FlowSource] may incur + * performance variability due to operating on the same resource and therefore causing interference. + */ +public interface InterferenceDomain { + /** + * Compute the performance score of a participant in this interference domain. + * + * @param key The participant to obtain the score of or `null` if the participant has no key. + * @param load The overall load on the interference domain. + * @return A score representing the performance score to be applied to the resource consumer, with 1 + * meaning no influence, <1 means that performance degrades, and >1 means that performance improves. + */ + public fun apply(key: InterferenceKey?, load: Double): Double +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt new file mode 100644 index 00000000..d28ebde5 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.interference + +/** + * A key that uniquely identifies a participant of an interference domain. + */ +public interface InterferenceKey diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt new file mode 100644 index 00000000..9f3afc4d --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -0,0 +1,356 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.internal + +import org.opendc.simulator.flow.* +import java.util.ArrayDeque +import kotlin.math.max +import kotlin.math.min + +/** + * Implementation of a [FlowConnection] managing the communication between flow sources and consumers. + */ +internal class FlowConsumerContextImpl( + private val engine: FlowEngineImpl, + private val source: FlowSource, + private val logic: FlowConsumerLogic +) : FlowConsumerContext { + /** + * The clock to track simulation time. + */ + private val _clock = engine.clock + + /** + * The capacity of the resource. + */ + override var capacity: Double = 0.0 + set(value) { + val oldValue = field + + // Only changes will be propagated + if (value != oldValue) { + field = value + onCapacityChange() + } + } + + /** + * A flag to indicate the state of the context. + */ + private var _state = State.Pending + + /** + * The current processing speed of the resource. + */ + override val rate: Double + get() = _rate + private var _rate = 0.0 + + /** + * The current resource processing demand. + */ + override val demand: Double + get() = _limit + + /** + * The current state of the resource context. + */ + private var _limit: Double = 0.0 + private var _activeLimit: Double = 0.0 + private var _deadline: Long = Long.MIN_VALUE + + /** + * A flag to indicate that an update is active. + */ + private var _updateActive = false + + /** + * The update flag indicating why the update was triggered. + */ + private var _flag: Int = 0 + + /** + * The timestamp of calls to the callbacks. + */ + private var _lastUpdate: Long = Long.MIN_VALUE + private var _lastConvergence: Long = Long.MAX_VALUE + + /** + * The timers at which the context is scheduled to be interrupted. + */ + private val _timers: ArrayDeque = ArrayDeque() + + override fun start() { + check(_state == State.Pending) { "Consumer is already started" } + engine.batch { + source.onEvent(this, _clock.millis(), FlowEvent.Start) + _state = State.Active + pull() + } + } + + override fun close() { + if (_state == State.Stopped) { + return + } + + engine.batch { + _state = State.Stopped + if (!_updateActive) { + val now = _clock.millis() + val delta = max(0, now - _lastUpdate) + doStop(now, delta) + + // FIX: Make sure the context converges + _flag = _flag or FLAG_INVALIDATE + scheduleUpdate(_clock.millis()) + } + } + } + + override fun pull() { + if (_state == State.Stopped) { + return + } + + _flag = _flag or FLAG_INTERRUPT + scheduleUpdate(_clock.millis()) + } + + override fun flush() { + if (_state == State.Stopped) { + return + } + + engine.scheduleSync(_clock.millis(), this) + } + + override fun push(rate: Double) { + if (_limit == rate) { + return + } + + _limit = rate + + // Invalidate only if the active limit is change and no update is active + // If an update is active, it will already get picked up at the end of the update + if (_activeLimit != rate && !_updateActive) { + _flag = _flag or FLAG_INVALIDATE + scheduleUpdate(_clock.millis()) + } + } + + /** + * Determine whether the state of the resource context should be updated. + */ + fun shouldUpdate(timestamp: Long): Boolean { + // Either the resource context is flagged or there is a pending update at this timestamp + return _flag != 0 || _limit != _activeLimit || _deadline == timestamp + } + + /** + * Update the state of the resource context. + */ + fun doUpdate(now: Long) { + val oldState = _state + if (oldState != State.Active) { + return + } + + val lastUpdate = _lastUpdate + + _lastUpdate = now + _updateActive = true + + val delta = max(0, now - lastUpdate) + + try { + val duration = source.onPull(this, now, delta) + val newDeadline = if (duration != Long.MAX_VALUE) now + duration else duration + + // Reset update flags + _flag = 0 + + // Check whether the state has changed after [consumer.onNext] + when (_state) { + State.Active -> { + logic.onPush(this, now, delta, _limit) + + // Schedule an update at the new deadline + scheduleUpdate(now, newDeadline) + } + State.Stopped -> doStop(now, delta) + State.Pending -> throw IllegalStateException("Illegal transition to pending state") + } + + // Note: pending limit might be changed by [logic.onConsume], so re-fetch the value + val newLimit = _limit + + // Flush the changes to the flow + _activeLimit = newLimit + _deadline = newDeadline + _rate = min(capacity, newLimit) + } catch (cause: Throwable) { + doFail(now, delta, cause) + } finally { + _updateActive = false + } + } + + /** + * Prune the elapsed timers from this context. + */ + fun pruneTimers(now: Long) { + val timers = _timers + while (true) { + val head = timers.peek() + if (head == null || head.target > now) { + break + } + timers.poll() + } + } + + /** + * Try to re-schedule the resource context in case it was skipped. + */ + fun tryReschedule(now: Long) { + val deadline = _deadline + if (deadline > now && deadline != Long.MAX_VALUE) { + scheduleUpdate(now, deadline) + } + } + + /** + * This method is invoked when the system converges into a steady state. + */ + fun onConverge(timestamp: Long) { + val delta = max(0, timestamp - _lastConvergence) + _lastConvergence = timestamp + + try { + if (_state == State.Active) { + source.onEvent(this, timestamp, FlowEvent.Converge) + } + + logic.onConverge(this, timestamp, delta) + } catch (cause: Throwable) { + doFail(timestamp, max(0, timestamp - _lastUpdate), cause) + } + } + + override fun toString(): String = "FlowConsumerContextImpl[capacity=$capacity,rate=$_rate]" + + /** + * Stop the resource context. + */ + private fun doStop(now: Long, delta: Long) { + try { + source.onEvent(this, now, FlowEvent.Exit) + logic.onFinish(this, now, delta) + } catch (cause: Throwable) { + doFail(now, delta, cause) + } finally { + _deadline = Long.MAX_VALUE + _limit = 0.0 + } + } + + /** + * Fail the resource consumer. + */ + private fun doFail(now: Long, delta: Long, cause: Throwable) { + try { + source.onFailure(this, cause) + } catch (e: Throwable) { + e.addSuppressed(cause) + e.printStackTrace() + } + + logic.onFinish(this, now, delta) + } + + /** + * Indicate that the capacity of the resource has changed. + */ + private fun onCapacityChange() { + // Do not inform the consumer if it has not been started yet + if (_state != State.Active) { + return + } + + engine.batch { + // Inform the consumer of the capacity change. This might already trigger an interrupt. + source.onEvent(this, _clock.millis(), FlowEvent.Capacity) + + pull() + } + } + + /** + * Schedule an update for this resource context. + */ + private fun scheduleUpdate(now: Long) { + engine.scheduleImmediate(now, this) + } + + /** + * Schedule a delayed update for this resource context. + */ + private fun scheduleUpdate(now: Long, target: Long) { + val timers = _timers + if (target != Long.MAX_VALUE && (timers.isEmpty() || target < timers.peek().target)) { + timers.addFirst(engine.scheduleDelayed(now, this, target)) + } + } + + /** + * The state of a resource context. + */ + private enum class State { + /** + * The resource context is pending and the resource is waiting to be consumed. + */ + Pending, + + /** + * The resource context is active and the resource is currently being consumed. + */ + Active, + + /** + * The resource context is stopped and the resource cannot be consumed anymore. + */ + Stopped + } + + /** + * A flag to indicate that the context should be invalidated. + */ + private val FLAG_INVALIDATE = 0b01 + + /** + * A flag to indicate that the context should be interrupted. + */ + private val FLAG_INTERRUPT = 0b10 +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt new file mode 100644 index 00000000..141d335d --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.internal + +import org.opendc.simulator.flow.FlowCounters + +/** + * Mutable implementation of the [FlowCounters] interface. + */ +internal class FlowCountersImpl : FlowCounters { + override var demand: Double = 0.0 + override var actual: Double = 0.0 + override var overcommit: Double = 0.0 + override var interference: Double = 0.0 + + override fun reset() { + demand = 0.0 + actual = 0.0 + overcommit = 0.0 + interference = 0.0 + } + + override fun toString(): String { + return "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit,interference=$interference]" + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt new file mode 100644 index 00000000..1a50da2c --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt @@ -0,0 +1,297 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.internal + +import kotlinx.coroutines.Delay +import kotlinx.coroutines.DisposableHandle +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.Runnable +import org.opendc.simulator.flow.* +import java.time.Clock +import java.util.* +import kotlin.coroutines.ContinuationInterceptor +import kotlin.coroutines.CoroutineContext + +/** + * Internal implementation of the [FlowEngine] interface. + * + * @param context The coroutine context to use. + * @param clock The virtual simulation clock. + */ +internal class FlowEngineImpl(private val context: CoroutineContext, override val clock: Clock) : FlowEngine { + /** + * The [Delay] instance that provides scheduled execution of [Runnable]s. + */ + @OptIn(InternalCoroutinesApi::class) + private val delay = requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" } + + /** + * The queue of connection updates that are scheduled for immediate execution. + */ + private val queue = ArrayDeque() + + /** + * A priority queue containing the connection updates to be scheduled in the future. + */ + private val futureQueue = PriorityQueue() + + /** + * The stack of engine invocations to occur in the future. + */ + private val futureInvocations = ArrayDeque() + + /** + * The systems that have been visited during the engine cycle. + */ + private val visited = linkedSetOf() + + /** + * The index in the batch stack. + */ + private var batchIndex = 0 + + /** + * A flag to indicate that the engine is currently active. + */ + private val isRunning: Boolean + get() = batchIndex > 0 + + /** + * Update the specified [ctx] synchronously. + */ + fun scheduleSync(now: Long, ctx: FlowConsumerContextImpl) { + ctx.doUpdate(now) + visited.add(ctx) + + // In-case the engine is already running in the call-stack, return immediately. The changes will be picked + // up by the active engine. + if (isRunning) { + return + } + + try { + batchIndex++ + runEngine(now) + } finally { + batchIndex-- + } + } + + /** + * Enqueue the specified [ctx] to be updated immediately during the active engine cycle. + * + * This method should be used when the state of a flow context is invalidated/interrupted and needs to be + * re-computed. In case no engine is currently active, the engine will be started. + */ + fun scheduleImmediate(now: Long, ctx: FlowConsumerContextImpl) { + queue.add(ctx) + + // In-case the engine is already running in the call-stack, return immediately. The changes will be picked + // up by the active engine. + if (isRunning) { + return + } + + try { + batchIndex++ + runEngine(now) + } finally { + batchIndex-- + } + } + + /** + * Schedule the engine to run at [target] to update the flow contexts. + * + * This method will override earlier calls to this method for the same [ctx]. + * + * @param now The current virtual timestamp. + * @param ctx The flow context to which the event applies. + * @param target The timestamp when the interrupt should happen. + */ + fun scheduleDelayed(now: Long, ctx: FlowConsumerContextImpl, target: Long): Timer { + val futureQueue = futureQueue + + require(target >= now) { "Timestamp must be in the future" } + + val timer = Timer(ctx, target) + futureQueue.add(timer) + + return timer + } + + override fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext = FlowConsumerContextImpl(this, consumer, provider) + + override fun pushBatch() { + batchIndex++ + } + + override fun popBatch() { + try { + // Flush the work if the platform is not already running + if (batchIndex == 1 && queue.isNotEmpty()) { + runEngine(clock.millis()) + } + } finally { + batchIndex-- + } + } + + /** + * Run all the enqueued actions for the specified [timestamp][now]. + */ + private fun runEngine(now: Long) { + val queue = queue + val futureQueue = futureQueue + val futureInvocations = futureInvocations + val visited = visited + + // Remove any entries in the `futureInvocations` queue from the past + while (true) { + val head = futureInvocations.peek() + if (head == null || head.timestamp > now) { + break + } + futureInvocations.poll() + } + + // Execute all scheduled updates at current timestamp + while (true) { + val timer = futureQueue.peek() ?: break + val ctx = timer.ctx + val target = timer.target + + assert(target >= now) { "Internal inconsistency: found update of the past" } + + if (target > now) { + break + } + + futureQueue.poll() + + ctx.pruneTimers(now) + + if (ctx.shouldUpdate(now)) { + ctx.doUpdate(now) + visited.add(ctx) + } else { + ctx.tryReschedule(now) + } + } + + // Repeat execution of all immediate updates until the system has converged to a steady-state + // We have to take into account that the onConverge callback can also trigger new actions. + do { + // Execute all immediate updates + while (true) { + val ctx = queue.poll() ?: break + + if (ctx.shouldUpdate(now)) { + ctx.doUpdate(now) + visited.add(ctx) + } + } + + for (system in visited) { + system.onConverge(now) + } + + visited.clear() + } while (queue.isNotEmpty()) + + // Schedule an engine invocation for the next update to occur. + val headTimer = futureQueue.peek() + if (headTimer != null) { + trySchedule(now, futureInvocations, headTimer.target) + } + } + + /** + * Try to schedule an engine invocation at the specified [target]. + * + * @param now The current virtual timestamp. + * @param target The virtual timestamp at which the engine invocation should happen. + * @param scheduled The queue of scheduled invocations. + */ + private fun trySchedule(now: Long, scheduled: ArrayDeque, target: Long) { + while (true) { + val invocation = scheduled.peekFirst() + if (invocation == null || invocation.timestamp > target) { + // Case 2: A new timer was registered ahead of the other timers. + // Solution: Schedule a new scheduler invocation + @OptIn(InternalCoroutinesApi::class) + val handle = delay.invokeOnTimeout( + target - now, + { + try { + batchIndex++ + runEngine(target) + } finally { + batchIndex-- + } + }, + context + ) + scheduled.addFirst(Invocation(target, handle)) + break + } else if (invocation.timestamp < target) { + // Case 2: A timer was cancelled and the head of the timer queue is now later than excepted + // Solution: Cancel the next scheduler invocation + scheduled.pollFirst() + + invocation.cancel() + } else { + break + } + } + } + + /** + * A future engine invocation. + * + * This class is used to keep track of the future engine invocations created using the [Delay] instance. In case + * the invocation is not needed anymore, it can be cancelled via [cancel]. + */ + private data class Invocation( + @JvmField val timestamp: Long, + @JvmField val handle: DisposableHandle + ) { + /** + * Cancel the engine invocation. + */ + fun cancel() = handle.dispose() + } + + /** + * An update call for [ctx] that is scheduled for [target]. + * + * This class represents an update in the future at [target] requested by [ctx]. + */ + class Timer(@JvmField val ctx: FlowConsumerContextImpl, @JvmField val target: Long) : Comparable { + override fun compareTo(other: Timer): Int { + return target.compareTo(other.target) + } + + override fun toString(): String = "Timer[ctx=$ctx,timestamp=$target]" + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt new file mode 100644 index 00000000..17b82391 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.mux + +import org.opendc.simulator.flow.FlowConsumer +import org.opendc.simulator.flow.FlowCounters +import org.opendc.simulator.flow.FlowSource +import org.opendc.simulator.flow.interference.InterferenceKey + +/** + * A [FlowMultiplexer] enables multiplexing multiple [FlowSource]s over possibly multiple [FlowConsumer]s. + */ +public interface FlowMultiplexer { + /** + * The inputs of the multiplexer that can be used to consume sources. + */ + public val inputs: Set + + /** + * The outputs of the multiplexer over which the flows will be distributed. + */ + public val outputs: Set + + /** + * The flow counters to track the flow metrics of all multiplexer inputs. + */ + public val counters: FlowCounters + + /** + * Create a new input on this multiplexer. + * + * @param key The key of the interference member to which the input belongs. + */ + public fun newInput(key: InterferenceKey? = null): FlowConsumer + + /** + * Remove [input] from this multiplexer. + */ + public fun removeInput(input: FlowConsumer) + + /** + * Add the specified [output] to the multiplexer. + */ + public fun addOutput(output: FlowConsumer) + + /** + * Clear all inputs and outputs from the switch. + */ + public fun clear() +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt new file mode 100644 index 00000000..811d9460 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.mux + +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.interference.InterferenceKey +import java.util.ArrayDeque + +/** + * A [FlowMultiplexer] implementation that allocates inputs to the outputs of the multiplexer exclusively. This means + * that a single input is directly connected to an output and that the multiplexer can only support as many + * inputs as outputs. + * + * @param engine The [FlowEngine] driving the simulation. + */ +public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMultiplexer { + override val inputs: Set + get() = _inputs + private val _inputs = mutableSetOf() + + override val outputs: Set + get() = _outputs + private val _outputs = mutableSetOf() + private val _availableOutputs = ArrayDeque() + + override val counters: FlowCounters = object : FlowCounters { + override val demand: Double + get() = _outputs.sumOf { it.counters.demand } + override val actual: Double + get() = _outputs.sumOf { it.counters.actual } + override val overcommit: Double + get() = _outputs.sumOf { it.counters.overcommit } + override val interference: Double + get() = _outputs.sumOf { it.counters.interference } + + override fun reset() { + for (input in _outputs) { + input.counters.reset() + } + } + + override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" + } + + override fun newInput(key: InterferenceKey?): FlowConsumer { + val forwarder = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" } + val output = Input(forwarder) + _inputs += output + return output + } + + override fun removeInput(input: FlowConsumer) { + if (!_inputs.remove(input)) { + return + } + + (input as Input).close() + } + + override fun addOutput(output: FlowConsumer) { + if (output in outputs) { + return + } + + val forwarder = FlowForwarder(engine) + + _outputs += output + _availableOutputs += forwarder + + output.startConsumer(object : FlowSource by forwarder { + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + if (event == FlowEvent.Exit) { + // De-register the output after it has finished + _outputs -= output + } + + forwarder.onEvent(conn, now, event) + } + }) + } + + override fun clear() { + for (input in _outputs) { + input.cancel() + } + _outputs.clear() + + // Inputs are implicitly cancelled by the output forwarders + _inputs.clear() + } + + /** + * An input on the multiplexer. + */ + private inner class Input(private val forwarder: FlowForwarder) : FlowConsumer by forwarder { + /** + * Close the input. + */ + fun close() { + // We explicitly do not close the forwarder here in order to re-use it across input resources. + _inputs -= this + _availableOutputs += forwarder + } + + override fun toString(): String = "ForwardingFlowMultiplexer.Input" + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt new file mode 100644 index 00000000..9735f121 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -0,0 +1,399 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.mux + +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.interference.InterferenceDomain +import org.opendc.simulator.flow.interference.InterferenceKey +import org.opendc.simulator.flow.internal.FlowCountersImpl +import kotlin.math.max +import kotlin.math.min + +/** + * A [FlowMultiplexer] implementation that multiplexes flows over the available outputs using max-min fair sharing. + * + * @param engine The [FlowEngine] to drive the flow simulation. + * @param parent The parent flow system of the multiplexer. + * @param interferenceDomain The interference domain of the multiplexer. + */ +public class MaxMinFlowMultiplexer( + private val engine: FlowEngine, + private val parent: FlowSystem? = null, + private val interferenceDomain: InterferenceDomain? = null +) : FlowMultiplexer { + /** + * The inputs of the multiplexer. + */ + override val inputs: Set + get() = _inputs + private val _inputs = mutableSetOf() + private val _activeInputs = mutableListOf() + + /** + * The outputs of the multiplexer. + */ + override val outputs: Set + get() = _outputs + private val _outputs = mutableSetOf() + private val _activeOutputs = mutableListOf() + + /** + * The flow counters of this multiplexer. + */ + public override val counters: FlowCounters + get() = _counters + private val _counters = FlowCountersImpl() + + /** + * The actual processing rate of the multiplexer. + */ + private var _rate = 0.0 + + /** + * The demanded processing rate of the input. + */ + private var _demand = 0.0 + + /** + * The capacity of the outputs. + */ + private var _capacity = 0.0 + + /** + * Flag to indicate that the scheduler is active. + */ + private var _schedulerActive = false + + override fun newInput(key: InterferenceKey?): FlowConsumer { + val provider = Input(_capacity, key) + _inputs.add(provider) + return provider + } + + override fun addOutput(output: FlowConsumer) { + val consumer = Output(output) + if (_outputs.add(output)) { + _activeOutputs.add(consumer) + output.startConsumer(consumer) + } + } + + override fun removeInput(input: FlowConsumer) { + if (!_inputs.remove(input)) { + return + } + // This cast should always succeed since only `Input` instances should be added to `_inputs` + (input as Input).close() + } + + override fun clear() { + for (input in _activeOutputs) { + input.cancel() + } + _activeOutputs.clear() + + for (output in _activeInputs) { + output.cancel() + } + _activeInputs.clear() + } + + /** + * Converge the scheduler of the multiplexer. + */ + private fun runScheduler(now: Long) { + if (_schedulerActive) { + return + } + + _schedulerActive = true + try { + doSchedule(now) + } finally { + _schedulerActive = false + } + } + + /** + * Schedule the inputs over the outputs. + */ + private fun doSchedule(now: Long) { + val activeInputs = _activeInputs + val activeOutputs = _activeOutputs + + // If there is no work yet, mark the inputs as idle. + if (activeInputs.isEmpty()) { + return + } + + val capacity = _capacity + var availableCapacity = capacity + + // Pull in the work of the outputs + val inputIterator = activeInputs.listIterator() + for (input in inputIterator) { + input.pull(now) + + // Remove outputs that have finished + if (!input.isActive) { + inputIterator.remove() + } + } + + var demand = 0.0 + + // Sort in-place the inputs based on their pushed flow. + // Profiling shows that it is faster than maintaining some kind of sorted set. + activeInputs.sort() + + // Divide the available output capacity fairly over the inputs using max-min fair sharing + var remaining = activeInputs.size + for (input in activeInputs) { + val availableShare = availableCapacity / remaining-- + val grantedRate = min(input.allowedRate, availableShare) + + // Ignore empty sources + if (grantedRate <= 0.0) { + input.actualRate = 0.0 + continue + } + + input.actualRate = grantedRate + demand += input.limit + availableCapacity -= grantedRate + } + + val rate = capacity - availableCapacity + + _demand = demand + _rate = rate + + // Sort all consumers by their capacity + activeOutputs.sort() + + // Divide the requests over the available capacity of the input resources fairly + for (output in activeOutputs) { + val inputCapacity = output.capacity + val fraction = inputCapacity / capacity + val grantedSpeed = rate * fraction + + output.push(grantedSpeed) + } + } + + /** + * Recompute the capacity of the multiplexer. + */ + private fun updateCapacity() { + val newCapacity = _activeOutputs.sumOf(Output::capacity) + + // No-op if the capacity is unchanged + if (_capacity == newCapacity) { + return + } + + _capacity = newCapacity + + for (input in _inputs) { + input.capacity = newCapacity + } + } + + /** + * An internal [FlowConsumer] implementation for multiplexer inputs. + */ + private inner class Input(capacity: Double, val key: InterferenceKey?) : + AbstractFlowConsumer(engine, capacity), + FlowConsumerLogic, + Comparable { + /** + * The requested limit. + */ + @JvmField var limit: Double = 0.0 + + /** + * The actual processing speed. + */ + @JvmField var actualRate: Double = 0.0 + + /** + * The processing rate that is allowed by the model constraints. + */ + val allowedRate: Double + get() = min(capacity, limit) + + /** + * A flag to indicate that the input is closed. + */ + private var _isClosed: Boolean = false + + /** + * The timestamp at which we received the last command. + */ + private var _lastPull: Long = Long.MIN_VALUE + + /** + * Close the input. + * + * This method is invoked when the user removes an input from the switch. + */ + fun close() { + _isClosed = true + cancel() + } + + /* AbstractFlowConsumer */ + override fun createLogic(): FlowConsumerLogic = this + + override fun start(ctx: FlowConsumerContext) { + check(!_isClosed) { "Cannot re-use closed input" } + + _activeInputs += this + super.start(ctx) + } + + /* FlowConsumerLogic */ + override fun onPush( + ctx: FlowConsumerContext, + now: Long, + delta: Long, + rate: Double + ) { + doUpdateCounters(delta) + + actualRate = 0.0 + this.limit = rate + _lastPull = now + + runScheduler(now) + } + + override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { + parent?.onConverge(now) + } + + override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) { + doUpdateCounters(delta) + + limit = 0.0 + actualRate = 0.0 + _lastPull = now + } + + /* Comparable */ + override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate) + + /** + * Pull the source if necessary. + */ + fun pull(now: Long) { + val ctx = ctx + if (ctx != null && _lastPull < now) { + ctx.flush() + } + } + + /** + * Helper method to update the flow counters of the multiplexer. + */ + private fun doUpdateCounters(delta: Long) { + if (delta <= 0L) { + return + } + + // Compute the performance penalty due to flow interference + val perfScore = if (interferenceDomain != null) { + val load = _rate / capacity + interferenceDomain.apply(key, load) + } else { + 1.0 + } + + val deltaS = delta / 1000.0 + val work = limit * deltaS + val actualWork = actualRate * deltaS + val remainingWork = work - actualWork + + updateCounters(work, actualWork, remainingWork) + + val distCounters = _counters + distCounters.demand += work + distCounters.actual += actualWork + distCounters.overcommit += remainingWork + distCounters.interference += actualWork * max(0.0, 1 - perfScore) + } + } + + /** + * An internal [FlowSource] implementation for multiplexer outputs. + */ + private inner class Output(private val provider: FlowConsumer) : FlowSource, Comparable { + /** + * The active [FlowConnection] of this source. + */ + private var _ctx: FlowConnection? = null + + /** + * The capacity of this output. + */ + val capacity: Double + get() = _ctx?.capacity ?: 0.0 + + /** + * Push the specified rate to the consumer. + */ + fun push(rate: Double) { + _ctx?.push(rate) + } + + /** + * Cancel this output. + */ + fun cancel() { + provider.cancel() + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + runScheduler(now) + return Long.MAX_VALUE + } + + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + when (event) { + FlowEvent.Start -> { + assert(_ctx == null) { "Source running concurrently" } + _ctx = conn + updateCapacity() + } + FlowEvent.Exit -> { + _ctx = null + updateCapacity() + } + FlowEvent.Capacity -> updateCapacity() + else -> {} + } + } + + override fun compareTo(other: Output): Int = capacity.compareTo(other.capacity) + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt new file mode 100644 index 00000000..d9779c6a --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.source + +import org.opendc.simulator.flow.FlowConnection +import org.opendc.simulator.flow.FlowSource +import kotlin.math.roundToLong + +/** + * A [FlowSource] that contains a fixed [amount] and is pushed with a given [utilization]. + */ +public class FixedFlowSource(private val amount: Double, private val utilization: Double) : FlowSource { + + init { + require(amount >= 0.0) { "Amount must be positive" } + require(utilization > 0.0) { "Utilization must be positive" } + } + + private var remainingAmount = amount + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + val consumed = conn.rate * delta / 1000.0 + val limit = conn.capacity * utilization + + remainingAmount -= consumed + + val duration = (remainingAmount / limit * 1000).roundToLong() + + return if (duration > 0) { + conn.push(limit) + duration + } else { + conn.close() + Long.MAX_VALUE + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt new file mode 100644 index 00000000..b3191ad3 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.source + +/** + * The [FlowSourceBarrier] is a barrier that allows multiple sources to wait for a select number of other sources to + * finish a pull, before proceeding its operation. + */ +public class FlowSourceBarrier(public val parties: Int) { + private var counter = 0 + + /** + * Enter the barrier and determine whether the caller is the last to reach the barrier. + * + * @return `true` if the caller is the last to reach the barrier, `false` otherwise. + */ + public fun enter(): Boolean { + val last = ++counter == parties + if (last) { + counter = 0 + return true + } + return false + } + + /** + * Reset the barrier. + */ + public fun reset() { + counter = 0 + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt new file mode 100644 index 00000000..7fcc0405 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.source + +import org.opendc.simulator.flow.FlowConnection +import org.opendc.simulator.flow.FlowEvent +import org.opendc.simulator.flow.FlowSource +import kotlin.math.min + +/** + * Helper class to expose an observable [speed] field describing the speed of the consumer. + */ +public class FlowSourceRateAdapter( + private val delegate: FlowSource, + private val callback: (Double) -> Unit = {} +) : FlowSource by delegate { + /** + * The resource processing speed at this instant. + */ + public var speed: Double = 0.0 + private set(value) { + if (field != value) { + callback(value) + field = value + } + } + + init { + callback(0.0) + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return delegate.onPull(conn, now, delta) + } + + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + val oldSpeed = speed + + delegate.onEvent(conn, now, event) + + when (event) { + FlowEvent.Converge -> speed = conn.rate + FlowEvent.Capacity -> { + // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might + // need to update the current speed. + if (oldSpeed == speed) { + speed = min(conn.capacity, speed) + } + } + FlowEvent.Exit -> speed = 0.0 + else -> {} + } + } + + override fun onFailure(conn: FlowConnection, cause: Throwable) { + speed = 0.0 + + delegate.onFailure(conn, cause) + } + + override fun toString(): String = "FlowSourceRateAdapter[delegate=$delegate]" +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt new file mode 100644 index 00000000..4d3ae61a --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.source + +import org.opendc.simulator.flow.FlowConnection +import org.opendc.simulator.flow.FlowEvent +import org.opendc.simulator.flow.FlowSource + +/** + * A [FlowSource] that replays a sequence of [Fragment], each indicating the flow rate for some period of time. + */ +public class TraceFlowSource(private val trace: Sequence) : FlowSource { + private var _iterator: Iterator? = null + private var _nextTarget = Long.MIN_VALUE + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + // Check whether the trace fragment was fully consumed, otherwise wait until we have done so + val nextTarget = _nextTarget + if (nextTarget > now) { + return now - nextTarget + } + + val iterator = checkNotNull(_iterator) + return if (iterator.hasNext()) { + val fragment = iterator.next() + _nextTarget = now + fragment.duration + conn.push(fragment.usage) + fragment.duration + } else { + conn.close() + Long.MAX_VALUE + } + } + + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + when (event) { + FlowEvent.Start -> { + check(_iterator == null) { "Source already running" } + _iterator = trace.iterator() + } + FlowEvent.Exit -> { + _iterator = null + } + else -> {} + } + } + + /** + * A fragment of the tgrace. + */ + public data class Fragment(val duration: Long, val usage: Double) +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt new file mode 100644 index 00000000..061ebea6 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt @@ -0,0 +1,152 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +import io.mockk.* +import kotlinx.coroutines.* +import org.junit.jupiter.api.* +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.internal.FlowConsumerContextImpl +import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow.source.FixedFlowSource + +/** + * A test suite for the [FlowConsumerContextImpl] class. + */ +class FlowConsumerContextTest { + @Test + fun testFlushWithoutCommand() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val consumer = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return if (now == 0L) { + conn.push(1.0) + 1000 + } else { + conn.close() + Long.MAX_VALUE + } + } + } + + val logic = object : FlowConsumerLogic {} + val context = FlowConsumerContextImpl(engine, consumer, logic) + + engine.scheduleSync(engine.clock.millis(), context) + } + + @Test + fun testIntermediateFlush() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val consumer = FixedFlowSource(1.0, 1.0) + + val logic = spyk(object : FlowConsumerLogic {}) + val context = FlowConsumerContextImpl(engine, consumer, logic) + context.capacity = 1.0 + + context.start() + delay(1) // Delay 1 ms to prevent hitting the fast path + engine.scheduleSync(engine.clock.millis(), context) + + verify(exactly = 2) { logic.onPush(any(), any(), any(), any()) } + } + + @Test + fun testDoubleStart() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val consumer = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return if (now == 0L) { + conn.push(0.0) + 1000 + } else { + conn.close() + Long.MAX_VALUE + } + } + } + + val logic = object : FlowConsumerLogic {} + val context = FlowConsumerContextImpl(engine, consumer, logic) + + context.start() + + assertThrows { + context.start() + } + } + + @Test + fun testIdempotentCapacityChange() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val consumer = spyk(object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return if (now == 0L) { + conn.push(1.0) + 1000 + } else { + conn.close() + Long.MAX_VALUE + } + } + }) + + val logic = object : FlowConsumerLogic {} + val context = FlowConsumerContextImpl(engine, consumer, logic) + context.capacity = 4200.0 + context.start() + context.capacity = 4200.0 + + verify(exactly = 0) { consumer.onEvent(any(), any(), FlowEvent.Capacity) } + } + + @Test + fun testFailureNoInfiniteLoop() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val consumer = spyk(object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + conn.close() + return Long.MAX_VALUE + } + + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + if (event == FlowEvent.Exit) throw IllegalStateException("onEvent") + } + + override fun onFailure(conn: FlowConnection, cause: Throwable) { + throw IllegalStateException("onFailure") + } + }) + + val logic = object : FlowConsumerLogic {} + + val context = FlowConsumerContextImpl(engine, consumer, logic) + + context.start() + + delay(1) + + verify(exactly = 1) { consumer.onFailure(any(), any()) } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt new file mode 100644 index 00000000..cbc48a4e --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt @@ -0,0 +1,222 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +import io.mockk.spyk +import io.mockk.verify +import kotlinx.coroutines.* +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow.source.FixedFlowSource + +/** + * A test suite for the [FlowForwarder] class. + */ +internal class FlowForwarderTest { + @Test + fun testCancelImmediately() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 2000.0) + + launch { source.consume(forwarder) } + + forwarder.consume(object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + conn.close() + return Long.MAX_VALUE + } + }) + + forwarder.close() + source.cancel() + } + + @Test + fun testCancel() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 2000.0) + + launch { source.consume(forwarder) } + + forwarder.consume(object : FlowSource { + var isFirst = true + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return if (isFirst) { + isFirst = false + conn.push(1.0) + 10 * 1000 + } else { + conn.close() + Long.MAX_VALUE + } + } + }) + + forwarder.close() + source.cancel() + } + + @Test + fun testState() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val consumer = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + conn.close() + return Long.MAX_VALUE + } + } + + assertFalse(forwarder.isActive) + + forwarder.startConsumer(consumer) + assertTrue(forwarder.isActive) + + assertThrows { forwarder.startConsumer(consumer) } + + forwarder.cancel() + assertFalse(forwarder.isActive) + + forwarder.close() + assertFalse(forwarder.isActive) + } + + @Test + fun testCancelPendingDelegate() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + + val consumer = spyk(object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + conn.close() + return Long.MAX_VALUE + } + }) + + forwarder.startConsumer(consumer) + forwarder.cancel() + + verify(exactly = 0) { consumer.onEvent(any(), any(), FlowEvent.Exit) } + } + + @Test + fun testCancelStartedDelegate() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 2000.0) + + val consumer = spyk(FixedFlowSource(2000.0, 1.0)) + + source.startConsumer(forwarder) + yield() + forwarder.startConsumer(consumer) + yield() + forwarder.cancel() + + verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Start) } + verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Exit) } + } + + @Test + fun testCancelPropagation() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 2000.0) + + val consumer = spyk(FixedFlowSource(2000.0, 1.0)) + + source.startConsumer(forwarder) + yield() + forwarder.startConsumer(consumer) + yield() + source.cancel() + + verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Start) } + verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Exit) } + } + + @Test + fun testExitPropagation() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine, isCoupled = true) + val source = FlowSink(engine, 2000.0) + + val consumer = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + conn.close() + return Long.MAX_VALUE + } + } + + source.startConsumer(forwarder) + forwarder.consume(consumer) + yield() + + assertFalse(forwarder.isActive) + } + + @Test + fun testAdjustCapacity() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 1.0) + + val consumer = spyk(FixedFlowSource(2.0, 1.0)) + source.startConsumer(forwarder) + + coroutineScope { + launch { forwarder.consume(consumer) } + delay(1000) + source.capacity = 0.5 + } + + assertEquals(3000, clock.millis()) + verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Capacity) } + } + + @Test + fun testCounters() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 1.0) + + val consumer = FixedFlowSource(2.0, 1.0) + source.startConsumer(forwarder) + + forwarder.consume(consumer) + + yield() + + assertEquals(2.0, source.counters.actual) + assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } + assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } + assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" } + assertEquals(2000, clock.millis()) + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt new file mode 100644 index 00000000..010a985e --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt @@ -0,0 +1,240 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +import io.mockk.every +import io.mockk.mockk +import io.mockk.spyk +import io.mockk.verify +import kotlinx.coroutines.* +import org.junit.jupiter.api.* +import org.junit.jupiter.api.Assertions.assertEquals +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow.source.FixedFlowSource +import org.opendc.simulator.flow.source.FlowSourceRateAdapter + +/** + * A test suite for the [FlowSink] class. + */ +internal class FlowSinkTest { + @Test + fun testSpeed() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val capacity = 4200.0 + val provider = FlowSink(engine, capacity) + + val consumer = FixedFlowSource(4200.0, 1.0) + + val res = mutableListOf() + val adapter = FlowSourceRateAdapter(consumer, res::add) + + provider.consume(adapter) + + assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } + } + + @Test + fun testAdjustCapacity() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val provider = FlowSink(engine, 1.0) + + val consumer = spyk(FixedFlowSource(2.0, 1.0)) + + coroutineScope { + launch { provider.consume(consumer) } + delay(1000) + provider.capacity = 0.5 + } + assertEquals(3000, clock.millis()) + verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Capacity) } + } + + @Test + fun testSpeedLimit() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val capacity = 4200.0 + val provider = FlowSink(engine, capacity) + + val consumer = FixedFlowSource(capacity, 2.0) + + val res = mutableListOf() + val adapter = FlowSourceRateAdapter(consumer, res::add) + + provider.consume(adapter) + + assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } + } + + /** + * Test to see whether no infinite recursion occurs when interrupting during [FlowSource.onStart] or + * [FlowSource.onPull]. + */ + @Test + fun testIntermediateInterrupt() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val capacity = 4200.0 + val provider = FlowSink(engine, capacity) + + val consumer = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + conn.close() + return Long.MAX_VALUE + } + + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + conn.pull() + } + } + + provider.consume(consumer) + } + + @Test + fun testInterrupt() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val capacity = 4200.0 + val provider = FlowSink(engine, capacity) + lateinit var resCtx: FlowConnection + + val consumer = object : FlowSource { + var isFirst = true + + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + when (event) { + FlowEvent.Start -> resCtx = conn + else -> {} + } + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return if (isFirst) { + isFirst = false + conn.push(1.0) + 4000 + } else { + conn.close() + Long.MAX_VALUE + } + } + } + + launch { + yield() + resCtx.pull() + } + provider.consume(consumer) + + assertEquals(0, clock.millis()) + } + + @Test + fun testFailure() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val capacity = 4200.0 + val provider = FlowSink(engine, capacity) + + val consumer = mockk(relaxUnitFun = true) + every { consumer.onEvent(any(), any(), eq(FlowEvent.Start)) } + .throws(IllegalStateException()) + + assertThrows { + provider.consume(consumer) + } + } + + @Test + fun testExceptionPropagationOnNext() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val capacity = 4200.0 + val provider = FlowSink(engine, capacity) + + val consumer = object : FlowSource { + var isFirst = true + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return if (isFirst) { + isFirst = false + conn.push(1.0) + 1000 + } else { + throw IllegalStateException() + } + } + } + + assertThrows { + provider.consume(consumer) + } + } + + @Test + fun testConcurrentConsumption() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val capacity = 4200.0 + val provider = FlowSink(engine, capacity) + + val consumer = FixedFlowSource(capacity, 1.0) + + assertThrows { + coroutineScope { + launch { provider.consume(consumer) } + provider.consume(consumer) + } + } + } + + @Test + fun testCancelDuringConsumption() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val capacity = 4200.0 + val provider = FlowSink(engine, capacity) + + val consumer = FixedFlowSource(capacity, 1.0) + + launch { provider.consume(consumer) } + delay(500) + provider.cancel() + + yield() + + assertEquals(500, clock.millis()) + } + + @Test + fun testInfiniteSleep() { + assertThrows { + runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val capacity = 4200.0 + val provider = FlowSink(engine, capacity) + + val consumer = object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long = Long.MAX_VALUE + } + + provider.consume(consumer) + } + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt new file mode 100644 index 00000000..b503087e --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.mux + +import kotlinx.coroutines.yield +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow.source.FixedFlowSource +import org.opendc.simulator.flow.source.FlowSourceRateAdapter +import org.opendc.simulator.flow.source.TraceFlowSource + +/** + * Test suite for the [ForwardingFlowMultiplexer] class. + */ +internal class SimResourceSwitchExclusiveTest { + /** + * Test a trace workload. + */ + @Test + fun testTrace() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val speed = mutableListOf() + + val duration = 5 * 60L + val workload = + TraceFlowSource( + sequenceOf( + TraceFlowSource.Fragment(duration * 1000, 28.0), + TraceFlowSource.Fragment(duration * 1000, 3500.0), + TraceFlowSource.Fragment(duration * 1000, 0.0), + TraceFlowSource.Fragment(duration * 1000, 183.0) + ), + ) + + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) + val forwarder = FlowForwarder(engine) + val adapter = FlowSourceRateAdapter(forwarder, speed::add) + source.startConsumer(adapter) + switch.addOutput(forwarder) + + val provider = switch.newInput() + provider.consume(workload) + yield() + + assertAll( + { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } }, + { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } } + ) + } + + /** + * Test runtime workload on hypervisor. + */ + @Test + fun testRuntimeWorkload() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val duration = 5 * 60L * 1000 + val workload = FixedFlowSource(duration * 3.2, 1.0) + + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) + + switch.addOutput(source) + + val provider = switch.newInput() + provider.consume(workload) + yield() + + assertEquals(duration, clock.millis()) { "Took enough time" } + } + + /** + * Test two workloads running sequentially. + */ + @Test + fun testTwoWorkloads() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val duration = 5 * 60L * 1000 + val workload = object : FlowSource { + var isFirst = true + + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + when (event) { + FlowEvent.Start -> isFirst = true + else -> {} + } + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return if (isFirst) { + isFirst = false + conn.push(1.0) + duration + } else { + conn.close() + Long.MAX_VALUE + } + } + } + + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) + + switch.addOutput(source) + + val provider = switch.newInput() + provider.consume(workload) + yield() + provider.consume(workload) + assertEquals(duration * 2, clock.millis()) { "Took enough time" } + } + + /** + * Test concurrent workloads on the machine. + */ + @Test + fun testConcurrentWorkloadFails() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) + + switch.addOutput(source) + + switch.newInput() + assertThrows { switch.newInput() } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt new file mode 100644 index 00000000..089a8d78 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.mux + +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch +import kotlinx.coroutines.yield +import org.junit.jupiter.api.* +import org.junit.jupiter.api.Assertions.assertEquals +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.FlowSink +import org.opendc.simulator.flow.consume +import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow.source.FixedFlowSource +import org.opendc.simulator.flow.source.TraceFlowSource + +/** + * Test suite for the [FlowMultiplexer] implementations + */ +internal class SimResourceSwitchMaxMinTest { + @Test + fun testSmoke() = runBlockingSimulation { + val scheduler = FlowEngineImpl(coroutineContext, clock) + val switch = MaxMinFlowMultiplexer(scheduler) + + val sources = List(2) { FlowSink(scheduler, 2000.0) } + sources.forEach { switch.addOutput(it) } + + val provider = switch.newInput() + val consumer = FixedFlowSource(2000.0, 1.0) + + try { + provider.consume(consumer) + yield() + } finally { + switch.clear() + } + } + + /** + * Test overcommitting of resources via the hypervisor with a single VM. + */ + @Test + fun testOvercommittedSingle() = runBlockingSimulation { + val scheduler = FlowEngineImpl(coroutineContext, clock) + + val duration = 5 * 60L + val workload = + TraceFlowSource( + sequenceOf( + TraceFlowSource.Fragment(duration * 1000, 28.0), + TraceFlowSource.Fragment(duration * 1000, 3500.0), + TraceFlowSource.Fragment(duration * 1000, 0.0), + TraceFlowSource.Fragment(duration * 1000, 183.0) + ), + ) + + val switch = MaxMinFlowMultiplexer(scheduler) + val provider = switch.newInput() + + try { + switch.addOutput(FlowSink(scheduler, 3200.0)) + provider.consume(workload) + yield() + } finally { + switch.clear() + } + + assertAll( + { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") }, + { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") }, + { assertEquals(90000.0, switch.counters.overcommit, "Overcommitted work does not match") }, + { assertEquals(1200000, clock.millis()) } + ) + } + + /** + * Test overcommitting of resources via the hypervisor with two VMs. + */ + @Test + fun testOvercommittedDual() = runBlockingSimulation { + val scheduler = FlowEngineImpl(coroutineContext, clock) + + val duration = 5 * 60L + val workloadA = + TraceFlowSource( + sequenceOf( + TraceFlowSource.Fragment(duration * 1000, 28.0), + TraceFlowSource.Fragment(duration * 1000, 3500.0), + TraceFlowSource.Fragment(duration * 1000, 0.0), + TraceFlowSource.Fragment(duration * 1000, 183.0) + ), + ) + val workloadB = + TraceFlowSource( + sequenceOf( + TraceFlowSource.Fragment(duration * 1000, 28.0), + TraceFlowSource.Fragment(duration * 1000, 3100.0), + TraceFlowSource.Fragment(duration * 1000, 0.0), + TraceFlowSource.Fragment(duration * 1000, 73.0) + ) + ) + + val switch = MaxMinFlowMultiplexer(scheduler) + val providerA = switch.newInput() + val providerB = switch.newInput() + + try { + switch.addOutput(FlowSink(scheduler, 3200.0)) + + coroutineScope { + launch { providerA.consume(workloadA) } + providerB.consume(workloadB) + } + + yield() + } finally { + switch.clear() + } + assertAll( + { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") }, + { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") }, + { assertEquals(1020000.0, switch.counters.overcommit, "Overcommitted work does not match") }, + { assertEquals(1200000, clock.millis()) } + ) + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt new file mode 100644 index 00000000..8396d346 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.source + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.FlowSink +import org.opendc.simulator.flow.consume +import org.opendc.simulator.flow.internal.FlowEngineImpl + +/** + * A test suite for the [FixedFlowSource] class. + */ +internal class FixedFlowSourceTest { + @Test + fun testSmoke() = runBlockingSimulation { + val scheduler = FlowEngineImpl(coroutineContext, clock) + val provider = FlowSink(scheduler, 1.0) + + val consumer = FixedFlowSource(1.0, 1.0) + + provider.consume(consumer) + assertEquals(1000, clock.millis()) + } + + @Test + fun testUtilization() = runBlockingSimulation { + val scheduler = FlowEngineImpl(coroutineContext, clock) + val provider = FlowSink(scheduler, 1.0) + + val consumer = FixedFlowSource(1.0, 0.5) + + provider.consume(consumer) + assertEquals(2000, clock.millis()) + } +} -- cgit v1.2.3 From 7b2d03add3170b9142bf42c5a64aaa263773caf7 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 30 Sep 2021 11:55:27 +0200 Subject: refactor(simulator): Separate push and pull flags This change separates the push and pull flags in FlowConsumerContextImpl, meaning that sources can now push directly without pulling and vice versa. --- .../flow/internal/FlowConsumerContextImpl.kt | 185 ++++++++++++--------- .../simulator/flow/FlowConsumerContextTest.kt | 17 -- 2 files changed, 103 insertions(+), 99 deletions(-) (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index 9f3afc4d..f62528ed 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -36,12 +36,7 @@ internal class FlowConsumerContextImpl( private val logic: FlowConsumerLogic ) : FlowConsumerContext { /** - * The clock to track simulation time. - */ - private val _clock = engine.clock - - /** - * The capacity of the resource. + * The capacity of the connection. */ override var capacity: Double = 0.0 set(value) { @@ -55,45 +50,56 @@ internal class FlowConsumerContextImpl( } /** - * A flag to indicate the state of the context. - */ - private var _state = State.Pending - - /** - * The current processing speed of the resource. + * The current processing rate of the connection. */ override val rate: Double get() = _rate private var _rate = 0.0 /** - * The current resource processing demand. + * The current flow processing demand. */ override val demand: Double - get() = _limit + get() = _demand + + /** + * The clock to track simulation time. + */ + private val _clock = engine.clock + + /** + * A flag to indicate the state of the connection. + */ + private var _state = State.Pending /** - * The current state of the resource context. + * The current state of the connection. */ - private var _limit: Double = 0.0 - private var _activeLimit: Double = 0.0 - private var _deadline: Long = Long.MIN_VALUE + private var _demand: Double = 0.0 // The current (pending) demand of the source + private var _activeDemand: Double = 0.0 // The previous demand of the source + private var _deadline: Long = Long.MAX_VALUE // The deadline of the source's timer + + /** + * A flag to indicate that the source should be pulled. + */ + private var _isPulled = false /** * A flag to indicate that an update is active. */ - private var _updateActive = false + private var _isUpdateActive = false /** - * The update flag indicating why the update was triggered. + * A flag to indicate that an immediate update is scheduled. */ - private var _flag: Int = 0 + private var _isImmediateUpdateScheduled = false /** * The timestamp of calls to the callbacks. */ - private var _lastUpdate: Long = Long.MIN_VALUE - private var _lastConvergence: Long = Long.MAX_VALUE + private var _lastPull: Long = Long.MIN_VALUE // Last call to `onPull` + private var _lastPush: Long = Long.MIN_VALUE // Last call to `onPush` + private var _lastConvergence: Long = Long.MAX_VALUE // Last call to `onConvergence` /** * The timers at which the context is scheduled to be interrupted. @@ -110,35 +116,35 @@ internal class FlowConsumerContextImpl( } override fun close() { - if (_state == State.Stopped) { + if (_state == State.Closed) { return } engine.batch { - _state = State.Stopped - if (!_updateActive) { + _state = State.Closed + if (!_isUpdateActive) { val now = _clock.millis() - val delta = max(0, now - _lastUpdate) + val delta = max(0, now - _lastPull) doStop(now, delta) // FIX: Make sure the context converges - _flag = _flag or FLAG_INVALIDATE - scheduleUpdate(_clock.millis()) + pull() } } } override fun pull() { - if (_state == State.Stopped) { + if (_state == State.Closed) { return } - _flag = _flag or FLAG_INTERRUPT - scheduleUpdate(_clock.millis()) + _isPulled = true + scheduleImmediate() } override fun flush() { - if (_state == State.Stopped) { + // Do not attempt to flush the connection if the connection is closed or an update is already active + if (_state == State.Closed || _isUpdateActive) { return } @@ -146,26 +152,28 @@ internal class FlowConsumerContextImpl( } override fun push(rate: Double) { - if (_limit == rate) { + if (_demand == rate) { return } - _limit = rate + _demand = rate - // Invalidate only if the active limit is change and no update is active + // Invalidate only if the active demand is changed and no update is active // If an update is active, it will already get picked up at the end of the update - if (_activeLimit != rate && !_updateActive) { - _flag = _flag or FLAG_INVALIDATE - scheduleUpdate(_clock.millis()) + if (_activeDemand != rate && !_isUpdateActive) { + scheduleImmediate() } } /** - * Determine whether the state of the resource context should be updated. + * Determine whether the state of the flow connection should be updated. */ fun shouldUpdate(timestamp: Long): Boolean { - // Either the resource context is flagged or there is a pending update at this timestamp - return _flag != 0 || _limit != _activeLimit || _deadline == timestamp + // The flow connection should be updated for three reasons: + // (1) The source should be pulled (after a call to `pull`) + // (2) The demand of the source has changed (after a call to `push`) + // (3) The timer of the source expired + return _isPulled || _demand != _activeDemand || _deadline == timestamp } /** @@ -177,43 +185,58 @@ internal class FlowConsumerContextImpl( return } - val lastUpdate = _lastUpdate + _isUpdateActive = true + _isImmediateUpdateScheduled = false - _lastUpdate = now - _updateActive = true - - val delta = max(0, now - lastUpdate) + val lastPush = _lastPush + val pushDelta = max(0, now - lastPush) try { - val duration = source.onPull(this, now, delta) - val newDeadline = if (duration != Long.MAX_VALUE) now + duration else duration - - // Reset update flags - _flag = 0 + // Pull the source if (1) `pull` is called or (2) the timer of the source has expired + val deadline = if (_isPulled || _deadline == now) { + val lastPull = _lastPull + val pullDelta = max(0, now - lastPull) + + _isPulled = false + _lastPull = now + + val duration = source.onPull(this, now, pullDelta) + if (duration != Long.MAX_VALUE) + now + duration + else + duration + } else { + _deadline + } // Check whether the state has changed after [consumer.onNext] when (_state) { State.Active -> { - logic.onPush(this, now, delta, _limit) + val demand = _demand + if (demand != _activeDemand) { + _lastPush = now - // Schedule an update at the new deadline - scheduleUpdate(now, newDeadline) + logic.onPush(this, now, pushDelta, demand) + } } - State.Stopped -> doStop(now, delta) + State.Closed -> doStop(now, pushDelta) State.Pending -> throw IllegalStateException("Illegal transition to pending state") } // Note: pending limit might be changed by [logic.onConsume], so re-fetch the value - val newLimit = _limit + val newLimit = _demand // Flush the changes to the flow - _activeLimit = newLimit - _deadline = newDeadline + _activeDemand = newLimit + _deadline = deadline _rate = min(capacity, newLimit) + + // Schedule an update at the new deadline + scheduleDelayed(now, deadline) } catch (cause: Throwable) { - doFail(now, delta, cause) + doFail(now, pushDelta, cause) } finally { - _updateActive = false + _isUpdateActive = false } } @@ -237,7 +260,7 @@ internal class FlowConsumerContextImpl( fun tryReschedule(now: Long) { val deadline = _deadline if (deadline > now && deadline != Long.MAX_VALUE) { - scheduleUpdate(now, deadline) + scheduleDelayed(now, deadline) } } @@ -255,7 +278,7 @@ internal class FlowConsumerContextImpl( logic.onConverge(this, timestamp, delta) } catch (cause: Throwable) { - doFail(timestamp, max(0, timestamp - _lastUpdate), cause) + doFail(timestamp, max(0, timestamp - _lastPull), cause) } } @@ -272,7 +295,7 @@ internal class FlowConsumerContextImpl( doFail(now, delta, cause) } finally { _deadline = Long.MAX_VALUE - _limit = 0.0 + _demand = 0.0 } } @@ -308,16 +331,24 @@ internal class FlowConsumerContextImpl( } /** - * Schedule an update for this resource context. + * Schedule an immediate update for this connection. */ - private fun scheduleUpdate(now: Long) { + private fun scheduleImmediate() { + // In case an immediate update is already scheduled, no need to do anything + if (_isImmediateUpdateScheduled) { + return + } + + _isImmediateUpdateScheduled = true + + val now = _clock.millis() engine.scheduleImmediate(now, this) } /** * Schedule a delayed update for this resource context. */ - private fun scheduleUpdate(now: Long, target: Long) { + private fun scheduleDelayed(now: Long, target: Long) { val timers = _timers if (target != Long.MAX_VALUE && (timers.isEmpty() || target < timers.peek().target)) { timers.addFirst(engine.scheduleDelayed(now, this, target)) @@ -325,32 +356,22 @@ internal class FlowConsumerContextImpl( } /** - * The state of a resource context. + * The state of a flow connection. */ private enum class State { /** - * The resource context is pending and the resource is waiting to be consumed. + * The connection is pending and the consumer is waiting to consume the source. */ Pending, /** - * The resource context is active and the resource is currently being consumed. + * The connection is active and the source is currently being consumed. */ Active, /** - * The resource context is stopped and the resource cannot be consumed anymore. + * The connection is closed and the source cannot be consumed through this connection anymore. */ - Stopped + Closed } - - /** - * A flag to indicate that the context should be invalidated. - */ - private val FLAG_INVALIDATE = 0b01 - - /** - * A flag to indicate that the context should be interrupted. - */ - private val FLAG_INTERRUPT = 0b10 } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt index 061ebea6..380fd38a 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt @@ -28,7 +28,6 @@ import org.junit.jupiter.api.* import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.internal.FlowConsumerContextImpl import org.opendc.simulator.flow.internal.FlowEngineImpl -import org.opendc.simulator.flow.source.FixedFlowSource /** * A test suite for the [FlowConsumerContextImpl] class. @@ -55,22 +54,6 @@ class FlowConsumerContextTest { engine.scheduleSync(engine.clock.millis(), context) } - @Test - fun testIntermediateFlush() = runBlockingSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val consumer = FixedFlowSource(1.0, 1.0) - - val logic = spyk(object : FlowConsumerLogic {}) - val context = FlowConsumerContextImpl(engine, consumer, logic) - context.capacity = 1.0 - - context.start() - delay(1) // Delay 1 ms to prevent hitting the fast path - engine.scheduleSync(engine.clock.millis(), context) - - verify(exactly = 2) { logic.onPush(any(), any(), any(), any()) } - } - @Test fun testDoubleStart() = runBlockingSimulation { val engine = FlowEngineImpl(coroutineContext, clock) -- cgit v1.2.3 From c3fe047be5d0026b50874efc671de54f01b6d5ee Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 30 Sep 2021 12:22:43 +0200 Subject: perf(simulator): Do not use set for tracking convergence This change removes the use of a HashSet for tracking the flow connections that can converge. A HashSet requires an allocation for every addition, which caused a significant overhead. The new approach using an ArrayDeque should not allocate any memory. --- .../flow/internal/FlowConsumerContextImpl.kt | 20 +++++++++++++++++--- .../opendc/simulator/flow/internal/FlowEngineImpl.kt | 15 ++++++++------- 2 files changed, 25 insertions(+), 10 deletions(-) (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index f62528ed..a4d82a3d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -94,6 +94,11 @@ internal class FlowConsumerContextImpl( */ private var _isImmediateUpdateScheduled = false + /** + * A flag that indicates to the [FlowEngine] that the context is already enqueued to converge. + */ + private var _willConverge: Boolean = false + /** * The timestamp of calls to the callbacks. */ @@ -177,12 +182,18 @@ internal class FlowConsumerContextImpl( } /** - * Update the state of the resource context. + * Update the state of the flow connection. + * + * @param now The current virtual timestamp. + * @return A flag to indicate whether the connection has already been updated before convergence. */ - fun doUpdate(now: Long) { + fun doUpdate(now: Long): Boolean { + val willConverge = _willConverge + _willConverge = true + val oldState = _state if (oldState != State.Active) { - return + return willConverge } _isUpdateActive = true @@ -238,6 +249,8 @@ internal class FlowConsumerContextImpl( } finally { _isUpdateActive = false } + + return willConverge } /** @@ -270,6 +283,7 @@ internal class FlowConsumerContextImpl( fun onConverge(timestamp: Long) { val delta = max(0, timestamp - _lastConvergence) _lastConvergence = timestamp + _willConverge = false try { if (_state == State.Active) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt index 1a50da2c..5f15fbed 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt @@ -63,7 +63,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va /** * The systems that have been visited during the engine cycle. */ - private val visited = linkedSetOf() + private val visited = ArrayDeque() /** * The index in the batch stack. @@ -80,8 +80,9 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va * Update the specified [ctx] synchronously. */ fun scheduleSync(now: Long, ctx: FlowConsumerContextImpl) { - ctx.doUpdate(now) - visited.add(ctx) + if (!ctx.doUpdate(now)) { + visited.add(ctx) + } // In-case the engine is already running in the call-stack, return immediately. The changes will be picked // up by the active engine. @@ -192,8 +193,9 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va ctx.pruneTimers(now) if (ctx.shouldUpdate(now)) { - ctx.doUpdate(now) - visited.add(ctx) + if (!ctx.doUpdate(now)) { + visited.add(ctx) + } } else { ctx.tryReschedule(now) } @@ -206,8 +208,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va while (true) { val ctx = queue.poll() ?: break - if (ctx.shouldUpdate(now)) { - ctx.doUpdate(now) + if (ctx.shouldUpdate(now) && !ctx.doUpdate(now)) { visited.add(ctx) } } -- cgit v1.2.3 From 7b3a31b11df76870b965748fd8f7e712682a9d30 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 30 Sep 2021 13:18:43 +0200 Subject: perf(simulator): Manage flow connection timers more efficiently This change reduces the number of operations necessary to manage the timers of a flow connection. --- .../flow/internal/FlowConsumerContextImpl.kt | 34 +++++++++++++--------- .../simulator/flow/internal/FlowEngineImpl.kt | 3 +- 2 files changed, 23 insertions(+), 14 deletions(-) (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index a4d82a3d..fc9c8059 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -109,7 +109,8 @@ internal class FlowConsumerContextImpl( /** * The timers at which the context is scheduled to be interrupted. */ - private val _timers: ArrayDeque = ArrayDeque() + private var _timer: FlowEngineImpl.Timer? = null + private val _pendingTimers: ArrayDeque = ArrayDeque(5) override fun start() { check(_state == State.Pending) { "Consumer is already started" } @@ -256,15 +257,10 @@ internal class FlowConsumerContextImpl( /** * Prune the elapsed timers from this context. */ - fun pruneTimers(now: Long) { - val timers = _timers - while (true) { - val head = timers.peek() - if (head == null || head.target > now) { - break - } - timers.poll() - } + fun updateTimers() { + // Invariant: Any pending timer should only point to a future timestamp + // See also `scheduleDelayed` + _timer = _pendingTimers.poll() } /** @@ -363,9 +359,21 @@ internal class FlowConsumerContextImpl( * Schedule a delayed update for this resource context. */ private fun scheduleDelayed(now: Long, target: Long) { - val timers = _timers - if (target != Long.MAX_VALUE && (timers.isEmpty() || target < timers.peek().target)) { - timers.addFirst(engine.scheduleDelayed(now, this, target)) + // Ignore any target scheduled at the maximum value + // This indicates that the sources does not want to register a timer + if (target == Long.MAX_VALUE) { + return + } + + val timer = _timer + + if (timer == null) { + // No existing timer exists, so schedule a new timer and update the head + _timer = engine.scheduleDelayed(now, this, target) + } else if (target < timer.target) { + // Existing timer is further in the future, so schedule a new timer ahead of it + _timer = engine.scheduleDelayed(now, this, target) + _pendingTimers.addFirst(timer) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt index 5f15fbed..c8170a43 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt @@ -190,7 +190,8 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va futureQueue.poll() - ctx.pruneTimers(now) + // Update the existing timers of the connection + ctx.updateTimers() if (ctx.shouldUpdate(now)) { if (!ctx.doUpdate(now)) { -- cgit v1.2.3 From b0fc93f818e5e735e972a04f5aa49e0ebe1de181 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 30 Sep 2021 14:35:30 +0200 Subject: refactor(simulator): Remove failure callback from FlowSource This change removes the `onFailure` method from FlowSource. Instead, the FlowConsumer will receive the reason for failure of the source. --- .../org/opendc/simulator/flow/FlowConsumer.kt | 26 ++-- .../org/opendc/simulator/flow/FlowConsumerLogic.kt | 5 +- .../org/opendc/simulator/flow/FlowForwarder.kt | 85 ++++++----- .../kotlin/org/opendc/simulator/flow/FlowSink.kt | 2 +- .../kotlin/org/opendc/simulator/flow/FlowSource.kt | 8 -- .../flow/internal/FlowConsumerContextImpl.kt | 32 +++-- .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 2 +- .../simulator/flow/source/FlowSourceRateAdapter.kt | 44 +++--- .../simulator/flow/FlowConsumerContextTest.kt | 31 ---- .../org/opendc/simulator/flow/FlowForwarderTest.kt | 96 +++++++++++++ .../flow/mux/ExclusiveFlowMultiplexerTest.kt | 157 +++++++++++++++++++++ .../flow/mux/MaxMinFlowMultiplexerTest.kt | 147 +++++++++++++++++++ .../flow/mux/SimResourceSwitchExclusiveTest.kt | 157 --------------------- .../flow/mux/SimResourceSwitchMaxMinTest.kt | 147 ------------------- 14 files changed, 513 insertions(+), 426 deletions(-) create mode 100644 opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt delete mode 100644 opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt delete mode 100644 opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt index 3a6e2e97..df2c4fab 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt @@ -82,22 +82,26 @@ public interface FlowConsumer { */ public suspend fun FlowConsumer.consume(source: FlowSource) { return suspendCancellableCoroutine { cont -> - startConsumer(object : FlowSource by source { - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - source.onEvent(conn, now, event) - - if (event == FlowEvent.Exit && !cont.isCompleted) { - cont.resume(Unit) + startConsumer(object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return try { + source.onPull(conn, now, delta) + } catch (cause: Throwable) { + cont.resumeWithException(cause) + throw cause } } - override fun onFailure(conn: FlowConnection, cause: Throwable) { + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { try { - source.onFailure(conn, cause) + source.onEvent(conn, now, event) + + if (event == FlowEvent.Exit && !cont.isCompleted) { + cont.resume(Unit) + } + } catch (cause: Throwable) { cont.resumeWithException(cause) - } catch (e: Throwable) { - e.addSuppressed(cause) - cont.resumeWithException(e) + throw cause } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt index c69cb17e..ef94ab22 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt @@ -46,11 +46,12 @@ public interface FlowConsumerLogic { public fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {} /** - * This method is invoked when the [FlowSource] is completed. + * This method is invoked when the [FlowSource] completed or failed. * * @param ctx The context in which the provider runs. * @param now The virtual timestamp in milliseconds at which the provider finished. * @param delta The virtual duration between this call and the last call to [onPush] in milliseconds. + * @param cause The cause of the failure or `null` if the source completed. */ - public fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) {} + public fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {} } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index 2074033e..bc01a11b 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -22,6 +22,7 @@ package org.opendc.simulator.flow +import mu.KotlinLogging import org.opendc.simulator.flow.internal.FlowCountersImpl /** @@ -31,6 +32,11 @@ import org.opendc.simulator.flow.internal.FlowCountersImpl * @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits. */ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled: Boolean = false) : FlowSource, FlowConsumer, AutoCloseable { + /** + * The logging instance of this connection. + */ + private val logger = KotlinLogging.logger {} + /** * The delegate [FlowSource]. */ @@ -59,23 +65,25 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled } override fun push(rate: Double) { + if (delegate == null) { + return + } + _innerCtx?.push(rate) _demand = rate } override fun close() { - val delegate = checkNotNull(delegate) { "Delegate not active" } - - if (isCoupled) - _innerCtx?.close() - else - _innerCtx?.push(0.0) + val delegate = delegate ?: return + val hasDelegateStarted = hasDelegateStarted // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we // reset beforehand the existing state and check whether it has been updated afterwards reset() - delegate.onEvent(this, engine.clock.millis(), FlowEvent.Exit) + if (hasDelegateStarted) { + delegate.onEvent(this, engine.clock.millis(), FlowEvent.Exit) + } } } @@ -114,16 +122,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled } override fun cancel() { - val delegate = delegate - val ctx = _innerCtx - - if (delegate != null) { - this.delegate = null - - if (ctx != null) { - delegate.onEvent(this._ctx, engine.clock.millis(), FlowEvent.Exit) - } - } + _ctx.close() } override fun close() { @@ -144,34 +143,42 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled updateCounters(conn, delta) - return delegate?.onPull(this._ctx, now, delta) ?: Long.MAX_VALUE + return try { + delegate?.onPull(this._ctx, now, delta) ?: Long.MAX_VALUE + } catch (cause: Throwable) { + logger.error(cause) { "Uncaught exception" } + + reset() + Long.MAX_VALUE + } } override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { when (event) { - FlowEvent.Start -> { - _innerCtx = conn - } + FlowEvent.Start -> _innerCtx = conn FlowEvent.Exit -> { _innerCtx = null val delegate = delegate if (delegate != null) { reset() - delegate.onEvent(this._ctx, now, FlowEvent.Exit) + + try { + delegate.onEvent(this._ctx, now, FlowEvent.Exit) + } catch (cause: Throwable) { + logger.error(cause) { "Uncaught exception" } + } } } - else -> delegate?.onEvent(this._ctx, now, event) - } - } - - override fun onFailure(conn: FlowConnection, cause: Throwable) { - _innerCtx = null + else -> + try { + delegate?.onEvent(this._ctx, now, event) + } catch (cause: Throwable) { + logger.error(cause) { "Uncaught exception" } - val delegate = delegate - if (delegate != null) { - reset() - delegate.onFailure(this._ctx, cause) + _innerCtx = null + reset() + } } } @@ -180,15 +187,25 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled */ private fun start() { val delegate = delegate ?: return - delegate.onEvent(checkNotNull(_innerCtx), engine.clock.millis(), FlowEvent.Start) - hasDelegateStarted = true + try { + delegate.onEvent(_ctx, engine.clock.millis(), FlowEvent.Start) + hasDelegateStarted = true + } catch (cause: Throwable) { + logger.error(cause) { "Uncaught exception" } + reset() + } } /** * Reset the delegate. */ private fun reset() { + if (isCoupled) + _innerCtx?.close() + else + _innerCtx?.push(0.0) + delegate = null hasDelegateStarted = false } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt index fb6ca85d..fc590177 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt @@ -46,7 +46,7 @@ public class FlowSink( updateCounters(ctx, delta) } - override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) { + override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { updateCounters(ctx, delta) cancel() } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt index 077b4d38..70687b4f 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt @@ -47,12 +47,4 @@ public interface FlowSource { * @param event The event that has occurred. */ public fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {} - - /** - * This method is invoked when the source throws an exception. - * - * @param conn The connection between the source and consumer. - * @param cause The cause of the failure. - */ - public fun onFailure(conn: FlowConnection, cause: Throwable) {} } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index fc9c8059..a74f89b4 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -22,6 +22,7 @@ package org.opendc.simulator.flow.internal +import mu.KotlinLogging import org.opendc.simulator.flow.* import java.util.ArrayDeque import kotlin.math.max @@ -35,6 +36,11 @@ internal class FlowConsumerContextImpl( private val source: FlowSource, private val logic: FlowConsumerLogic ) : FlowConsumerContext { + /** + * The logging instance of this connection. + */ + private val logger = KotlinLogging.logger {} + /** * The capacity of the connection. */ @@ -131,7 +137,7 @@ internal class FlowConsumerContextImpl( if (!_isUpdateActive) { val now = _clock.millis() val delta = max(0, now - _lastPull) - doStop(now, delta) + doStopSource(now, delta) // FIX: Make sure the context converges pull() @@ -231,7 +237,7 @@ internal class FlowConsumerContextImpl( logic.onPush(this, now, pushDelta, demand) } } - State.Closed -> doStop(now, pushDelta) + State.Closed -> doStopSource(now, pushDelta) State.Pending -> throw IllegalStateException("Illegal transition to pending state") } @@ -246,7 +252,7 @@ internal class FlowConsumerContextImpl( // Schedule an update at the new deadline scheduleDelayed(now, deadline) } catch (cause: Throwable) { - doFail(now, pushDelta, cause) + doFailSource(now, pushDelta, cause) } finally { _isUpdateActive = false } @@ -288,21 +294,21 @@ internal class FlowConsumerContextImpl( logic.onConverge(this, timestamp, delta) } catch (cause: Throwable) { - doFail(timestamp, max(0, timestamp - _lastPull), cause) + doFailSource(timestamp, max(0, timestamp - _lastPull), cause) } } override fun toString(): String = "FlowConsumerContextImpl[capacity=$capacity,rate=$_rate]" /** - * Stop the resource context. + * Stop the [FlowSource]. */ - private fun doStop(now: Long, delta: Long) { + private fun doStopSource(now: Long, delta: Long) { try { source.onEvent(this, now, FlowEvent.Exit) - logic.onFinish(this, now, delta) + logic.onFinish(this, now, delta, null) } catch (cause: Throwable) { - doFail(now, delta, cause) + doFailSource(now, delta, cause) } finally { _deadline = Long.MAX_VALUE _demand = 0.0 @@ -310,17 +316,15 @@ internal class FlowConsumerContextImpl( } /** - * Fail the resource consumer. + * Fail the [FlowSource]. */ - private fun doFail(now: Long, delta: Long, cause: Throwable) { + private fun doFailSource(now: Long, delta: Long, cause: Throwable) { try { - source.onFailure(this, cause) + logic.onFinish(this, now, delta, cause) } catch (e: Throwable) { e.addSuppressed(cause) - e.printStackTrace() + logger.error(e) { "Uncaught exception" } } - - logic.onFinish(this, now, delta) } /** diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 9735f121..a3e108f6 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -292,7 +292,7 @@ public class MaxMinFlowMultiplexer( parent?.onConverge(now) } - override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long) { + override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { doUpdateCounters(delta) limit = 0.0 diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt index 7fcc0405..fcee3906 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt @@ -28,7 +28,7 @@ import org.opendc.simulator.flow.FlowSource import kotlin.math.min /** - * Helper class to expose an observable [speed] field describing the speed of the consumer. + * Helper class to expose an observable [rate] field describing the flow rate of the source. */ public class FlowSourceRateAdapter( private val delegate: FlowSource, @@ -37,7 +37,7 @@ public class FlowSourceRateAdapter( /** * The resource processing speed at this instant. */ - public var speed: Double = 0.0 + public var rate: Double = 0.0 private set(value) { if (field != value) { callback(value) @@ -50,33 +50,37 @@ public class FlowSourceRateAdapter( } override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { - return delegate.onPull(conn, now, delta) + return try { + delegate.onPull(conn, now, delta) + } catch (cause: Throwable) { + rate = 0.0 + throw cause + } } override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - val oldSpeed = speed + val oldSpeed = rate - delegate.onEvent(conn, now, event) + try { + delegate.onEvent(conn, now, event) - when (event) { - FlowEvent.Converge -> speed = conn.rate - FlowEvent.Capacity -> { - // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might - // need to update the current speed. - if (oldSpeed == speed) { - speed = min(conn.capacity, speed) + when (event) { + FlowEvent.Converge -> rate = conn.rate + FlowEvent.Capacity -> { + // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might + // need to update the current speed. + if (oldSpeed == rate) { + rate = min(conn.capacity, rate) + } } + FlowEvent.Exit -> rate = 0.0 + else -> {} } - FlowEvent.Exit -> speed = 0.0 - else -> {} + } catch (cause: Throwable) { + rate = 0.0 + throw cause } } - override fun onFailure(conn: FlowConnection, cause: Throwable) { - speed = 0.0 - - delegate.onFailure(conn, cause) - } - override fun toString(): String = "FlowSourceRateAdapter[delegate=$delegate]" } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt index 380fd38a..f1a5cbe4 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt @@ -23,7 +23,6 @@ package org.opendc.simulator.flow import io.mockk.* -import kotlinx.coroutines.* import org.junit.jupiter.api.* import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.internal.FlowConsumerContextImpl @@ -102,34 +101,4 @@ class FlowConsumerContextTest { verify(exactly = 0) { consumer.onEvent(any(), any(), FlowEvent.Capacity) } } - - @Test - fun testFailureNoInfiniteLoop() = runBlockingSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val consumer = spyk(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { - conn.close() - return Long.MAX_VALUE - } - - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - if (event == FlowEvent.Exit) throw IllegalStateException("onEvent") - } - - override fun onFailure(conn: FlowConnection, cause: Throwable) { - throw IllegalStateException("onFailure") - } - }) - - val logic = object : FlowConsumerLogic {} - - val context = FlowConsumerContextImpl(engine, consumer, logic) - - context.start() - - delay(1) - - verify(exactly = 1) { consumer.onFailure(any(), any()) } - } } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt index cbc48a4e..d125c638 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt @@ -219,4 +219,100 @@ internal class FlowForwarderTest { assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" } assertEquals(2000, clock.millis()) } + + @Test + fun testCoupledExit() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine, isCoupled = true) + val source = FlowSink(engine, 2000.0) + + launch { source.consume(forwarder) } + + forwarder.consume(FixedFlowSource(2000.0, 1.0)) + + yield() + + assertFalse(source.isActive) + } + + @Test + fun testPullFailureCoupled() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine, isCoupled = true) + val source = FlowSink(engine, 2000.0) + + launch { source.consume(forwarder) } + + try { + forwarder.consume(object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + throw IllegalStateException("Test") + } + }) + } catch (cause: Throwable) { + // Ignore + } + + yield() + + assertFalse(source.isActive) + } + + @Test + fun testEventFailure() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 2000.0) + + launch { source.consume(forwarder) } + + try { + forwarder.consume(object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return Long.MAX_VALUE + } + + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + throw IllegalStateException("Test") + } + }) + } catch (cause: Throwable) { + // Ignore + } + + yield() + + assertTrue(source.isActive) + source.cancel() + } + + @Test + fun testEventConvergeFailure() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + val forwarder = FlowForwarder(engine) + val source = FlowSink(engine, 2000.0) + + launch { source.consume(forwarder) } + + try { + forwarder.consume(object : FlowSource { + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return Long.MAX_VALUE + } + + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + if (event == FlowEvent.Converge) { + throw IllegalStateException("Test") + } + } + }) + } catch (cause: Throwable) { + // Ignore + } + + yield() + + assertTrue(source.isActive) + source.cancel() + } } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt new file mode 100644 index 00000000..c8627446 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.mux + +import kotlinx.coroutines.yield +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow.source.FixedFlowSource +import org.opendc.simulator.flow.source.FlowSourceRateAdapter +import org.opendc.simulator.flow.source.TraceFlowSource + +/** + * Test suite for the [ForwardingFlowMultiplexer] class. + */ +internal class ExclusiveFlowMultiplexerTest { + /** + * Test a trace workload. + */ + @Test + fun testTrace() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val speed = mutableListOf() + + val duration = 5 * 60L + val workload = + TraceFlowSource( + sequenceOf( + TraceFlowSource.Fragment(duration * 1000, 28.0), + TraceFlowSource.Fragment(duration * 1000, 3500.0), + TraceFlowSource.Fragment(duration * 1000, 0.0), + TraceFlowSource.Fragment(duration * 1000, 183.0) + ), + ) + + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) + val forwarder = FlowForwarder(engine) + val adapter = FlowSourceRateAdapter(forwarder, speed::add) + source.startConsumer(adapter) + switch.addOutput(forwarder) + + val provider = switch.newInput() + provider.consume(workload) + yield() + + assertAll( + { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } }, + { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } } + ) + } + + /** + * Test runtime workload on hypervisor. + */ + @Test + fun testRuntimeWorkload() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val duration = 5 * 60L * 1000 + val workload = FixedFlowSource(duration * 3.2, 1.0) + + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) + + switch.addOutput(source) + + val provider = switch.newInput() + provider.consume(workload) + yield() + + assertEquals(duration, clock.millis()) { "Took enough time" } + } + + /** + * Test two workloads running sequentially. + */ + @Test + fun testTwoWorkloads() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val duration = 5 * 60L * 1000 + val workload = object : FlowSource { + var isFirst = true + + override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + when (event) { + FlowEvent.Start -> isFirst = true + else -> {} + } + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return if (isFirst) { + isFirst = false + conn.push(1.0) + duration + } else { + conn.close() + Long.MAX_VALUE + } + } + } + + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) + + switch.addOutput(source) + + val provider = switch.newInput() + provider.consume(workload) + yield() + provider.consume(workload) + assertEquals(duration * 2, clock.millis()) { "Took enough time" } + } + + /** + * Test concurrent workloads on the machine. + */ + @Test + fun testConcurrentWorkloadFails() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) + + switch.addOutput(source) + + switch.newInput() + assertThrows { switch.newInput() } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt new file mode 100644 index 00000000..9f6b8a2c --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.mux + +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch +import kotlinx.coroutines.yield +import org.junit.jupiter.api.* +import org.junit.jupiter.api.Assertions.assertEquals +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.FlowSink +import org.opendc.simulator.flow.consume +import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow.source.FixedFlowSource +import org.opendc.simulator.flow.source.TraceFlowSource + +/** + * Test suite for the [FlowMultiplexer] implementations + */ +internal class MaxMinFlowMultiplexerTest { + @Test + fun testSmoke() = runBlockingSimulation { + val scheduler = FlowEngineImpl(coroutineContext, clock) + val switch = MaxMinFlowMultiplexer(scheduler) + + val sources = List(2) { FlowSink(scheduler, 2000.0) } + sources.forEach { switch.addOutput(it) } + + val provider = switch.newInput() + val consumer = FixedFlowSource(2000.0, 1.0) + + try { + provider.consume(consumer) + yield() + } finally { + switch.clear() + } + } + + /** + * Test overcommitting of resources via the hypervisor with a single VM. + */ + @Test + fun testOvercommittedSingle() = runBlockingSimulation { + val scheduler = FlowEngineImpl(coroutineContext, clock) + + val duration = 5 * 60L + val workload = + TraceFlowSource( + sequenceOf( + TraceFlowSource.Fragment(duration * 1000, 28.0), + TraceFlowSource.Fragment(duration * 1000, 3500.0), + TraceFlowSource.Fragment(duration * 1000, 0.0), + TraceFlowSource.Fragment(duration * 1000, 183.0) + ), + ) + + val switch = MaxMinFlowMultiplexer(scheduler) + val provider = switch.newInput() + + try { + switch.addOutput(FlowSink(scheduler, 3200.0)) + provider.consume(workload) + yield() + } finally { + switch.clear() + } + + assertAll( + { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") }, + { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") }, + { assertEquals(90000.0, switch.counters.overcommit, "Overcommitted work does not match") }, + { assertEquals(1200000, clock.millis()) } + ) + } + + /** + * Test overcommitting of resources via the hypervisor with two VMs. + */ + @Test + fun testOvercommittedDual() = runBlockingSimulation { + val scheduler = FlowEngineImpl(coroutineContext, clock) + + val duration = 5 * 60L + val workloadA = + TraceFlowSource( + sequenceOf( + TraceFlowSource.Fragment(duration * 1000, 28.0), + TraceFlowSource.Fragment(duration * 1000, 3500.0), + TraceFlowSource.Fragment(duration * 1000, 0.0), + TraceFlowSource.Fragment(duration * 1000, 183.0) + ), + ) + val workloadB = + TraceFlowSource( + sequenceOf( + TraceFlowSource.Fragment(duration * 1000, 28.0), + TraceFlowSource.Fragment(duration * 1000, 3100.0), + TraceFlowSource.Fragment(duration * 1000, 0.0), + TraceFlowSource.Fragment(duration * 1000, 73.0) + ) + ) + + val switch = MaxMinFlowMultiplexer(scheduler) + val providerA = switch.newInput() + val providerB = switch.newInput() + + try { + switch.addOutput(FlowSink(scheduler, 3200.0)) + + coroutineScope { + launch { providerA.consume(workloadA) } + providerB.consume(workloadB) + } + + yield() + } finally { + switch.clear() + } + assertAll( + { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") }, + { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") }, + { assertEquals(1020000.0, switch.counters.overcommit, "Overcommitted work does not match") }, + { assertEquals(1200000, clock.millis()) } + ) + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt deleted file mode 100644 index b503087e..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchExclusiveTest.kt +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.mux - -import kotlinx.coroutines.yield -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.flow.* -import org.opendc.simulator.flow.internal.FlowEngineImpl -import org.opendc.simulator.flow.source.FixedFlowSource -import org.opendc.simulator.flow.source.FlowSourceRateAdapter -import org.opendc.simulator.flow.source.TraceFlowSource - -/** - * Test suite for the [ForwardingFlowMultiplexer] class. - */ -internal class SimResourceSwitchExclusiveTest { - /** - * Test a trace workload. - */ - @Test - fun testTrace() = runBlockingSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val speed = mutableListOf() - - val duration = 5 * 60L - val workload = - TraceFlowSource( - sequenceOf( - TraceFlowSource.Fragment(duration * 1000, 28.0), - TraceFlowSource.Fragment(duration * 1000, 3500.0), - TraceFlowSource.Fragment(duration * 1000, 0.0), - TraceFlowSource.Fragment(duration * 1000, 183.0) - ), - ) - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - val forwarder = FlowForwarder(engine) - val adapter = FlowSourceRateAdapter(forwarder, speed::add) - source.startConsumer(adapter) - switch.addOutput(forwarder) - - val provider = switch.newInput() - provider.consume(workload) - yield() - - assertAll( - { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } }, - { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } } - ) - } - - /** - * Test runtime workload on hypervisor. - */ - @Test - fun testRuntimeWorkload() = runBlockingSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val duration = 5 * 60L * 1000 - val workload = FixedFlowSource(duration * 3.2, 1.0) - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - - switch.addOutput(source) - - val provider = switch.newInput() - provider.consume(workload) - yield() - - assertEquals(duration, clock.millis()) { "Took enough time" } - } - - /** - * Test two workloads running sequentially. - */ - @Test - fun testTwoWorkloads() = runBlockingSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val duration = 5 * 60L * 1000 - val workload = object : FlowSource { - var isFirst = true - - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - when (event) { - FlowEvent.Start -> isFirst = true - else -> {} - } - } - - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { - return if (isFirst) { - isFirst = false - conn.push(1.0) - duration - } else { - conn.close() - Long.MAX_VALUE - } - } - } - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - - switch.addOutput(source) - - val provider = switch.newInput() - provider.consume(workload) - yield() - provider.consume(workload) - assertEquals(duration * 2, clock.millis()) { "Took enough time" } - } - - /** - * Test concurrent workloads on the machine. - */ - @Test - fun testConcurrentWorkloadFails() = runBlockingSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - - switch.addOutput(source) - - switch.newInput() - assertThrows { switch.newInput() } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt deleted file mode 100644 index 089a8d78..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/SimResourceSwitchMaxMinTest.kt +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.mux - -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.launch -import kotlinx.coroutines.yield -import org.junit.jupiter.api.* -import org.junit.jupiter.api.Assertions.assertEquals -import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.flow.FlowSink -import org.opendc.simulator.flow.consume -import org.opendc.simulator.flow.internal.FlowEngineImpl -import org.opendc.simulator.flow.source.FixedFlowSource -import org.opendc.simulator.flow.source.TraceFlowSource - -/** - * Test suite for the [FlowMultiplexer] implementations - */ -internal class SimResourceSwitchMaxMinTest { - @Test - fun testSmoke() = runBlockingSimulation { - val scheduler = FlowEngineImpl(coroutineContext, clock) - val switch = MaxMinFlowMultiplexer(scheduler) - - val sources = List(2) { FlowSink(scheduler, 2000.0) } - sources.forEach { switch.addOutput(it) } - - val provider = switch.newInput() - val consumer = FixedFlowSource(2000.0, 1.0) - - try { - provider.consume(consumer) - yield() - } finally { - switch.clear() - } - } - - /** - * Test overcommitting of resources via the hypervisor with a single VM. - */ - @Test - fun testOvercommittedSingle() = runBlockingSimulation { - val scheduler = FlowEngineImpl(coroutineContext, clock) - - val duration = 5 * 60L - val workload = - TraceFlowSource( - sequenceOf( - TraceFlowSource.Fragment(duration * 1000, 28.0), - TraceFlowSource.Fragment(duration * 1000, 3500.0), - TraceFlowSource.Fragment(duration * 1000, 0.0), - TraceFlowSource.Fragment(duration * 1000, 183.0) - ), - ) - - val switch = MaxMinFlowMultiplexer(scheduler) - val provider = switch.newInput() - - try { - switch.addOutput(FlowSink(scheduler, 3200.0)) - provider.consume(workload) - yield() - } finally { - switch.clear() - } - - assertAll( - { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") }, - { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") }, - { assertEquals(90000.0, switch.counters.overcommit, "Overcommitted work does not match") }, - { assertEquals(1200000, clock.millis()) } - ) - } - - /** - * Test overcommitting of resources via the hypervisor with two VMs. - */ - @Test - fun testOvercommittedDual() = runBlockingSimulation { - val scheduler = FlowEngineImpl(coroutineContext, clock) - - val duration = 5 * 60L - val workloadA = - TraceFlowSource( - sequenceOf( - TraceFlowSource.Fragment(duration * 1000, 28.0), - TraceFlowSource.Fragment(duration * 1000, 3500.0), - TraceFlowSource.Fragment(duration * 1000, 0.0), - TraceFlowSource.Fragment(duration * 1000, 183.0) - ), - ) - val workloadB = - TraceFlowSource( - sequenceOf( - TraceFlowSource.Fragment(duration * 1000, 28.0), - TraceFlowSource.Fragment(duration * 1000, 3100.0), - TraceFlowSource.Fragment(duration * 1000, 0.0), - TraceFlowSource.Fragment(duration * 1000, 73.0) - ) - ) - - val switch = MaxMinFlowMultiplexer(scheduler) - val providerA = switch.newInput() - val providerB = switch.newInput() - - try { - switch.addOutput(FlowSink(scheduler, 3200.0)) - - coroutineScope { - launch { providerA.consume(workloadA) } - providerB.consume(workloadB) - } - - yield() - } finally { - switch.clear() - } - assertAll( - { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") }, - { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") }, - { assertEquals(1020000.0, switch.counters.overcommit, "Overcommitted work does not match") }, - { assertEquals(1200000, clock.millis()) } - ) - } -} -- cgit v1.2.3 From 4f5a1f88d0c6aa19ce4cab0ec7b9b13a24c92fbe Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 30 Sep 2021 14:45:42 +0200 Subject: refactor(simulator): Remove capacity event This change removes the Capacity entry from FlowEvent. Since the source is always pulled on a capacity change, we do not need a separate event for this. --- .../kotlin/org/opendc/simulator/flow/FlowEvent.kt | 5 ----- .../flow/internal/FlowConsumerContextImpl.kt | 23 +++++----------------- .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 12 ++++++++--- .../simulator/flow/source/FlowSourceRateAdapter.kt | 10 ---------- .../simulator/flow/FlowConsumerContextTest.kt | 2 +- .../org/opendc/simulator/flow/FlowForwarderTest.kt | 2 +- .../org/opendc/simulator/flow/FlowSinkTest.kt | 2 +- 7 files changed, 17 insertions(+), 39 deletions(-) (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt index 14c85183..bb6f25b1 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt @@ -40,9 +40,4 @@ public enum class FlowEvent { * This event is emitted to the source when the system has converged into a steady state. */ Converge, - - /** - * This event is emitted to the source when the capacity of the consumer has changed. - */ - Capacity, } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index a74f89b4..a86ed6ea 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -51,7 +51,11 @@ internal class FlowConsumerContextImpl( // Only changes will be propagated if (value != oldValue) { field = value - onCapacityChange() + + // Do not pull the source if it has not been started yet + if (_state == State.Active) { + pull() + } } } @@ -327,23 +331,6 @@ internal class FlowConsumerContextImpl( } } - /** - * Indicate that the capacity of the resource has changed. - */ - private fun onCapacityChange() { - // Do not inform the consumer if it has not been started yet - if (_state != State.Active) { - return - } - - engine.batch { - // Inform the consumer of the capacity change. This might already trigger an interrupt. - source.onEvent(this, _clock.millis(), FlowEvent.Capacity) - - pull() - } - } - /** * Schedule an immediate update for this connection. */ diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index a3e108f6..b98cf2f1 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -356,8 +356,7 @@ public class MaxMinFlowMultiplexer( /** * The capacity of this output. */ - val capacity: Double - get() = _ctx?.capacity ?: 0.0 + @JvmField var capacity: Double = 0.0 /** * Push the specified rate to the consumer. @@ -374,6 +373,12 @@ public class MaxMinFlowMultiplexer( } override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + val capacity = capacity + if (capacity != conn.capacity) { + this.capacity = capacity + updateCapacity() + } + runScheduler(now) return Long.MAX_VALUE } @@ -383,13 +388,14 @@ public class MaxMinFlowMultiplexer( FlowEvent.Start -> { assert(_ctx == null) { "Source running concurrently" } _ctx = conn + capacity = conn.capacity updateCapacity() } FlowEvent.Exit -> { _ctx = null + capacity = 0.0 updateCapacity() } - FlowEvent.Capacity -> updateCapacity() else -> {} } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt index fcee3906..24ae64cb 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt @@ -25,7 +25,6 @@ package org.opendc.simulator.flow.source import org.opendc.simulator.flow.FlowConnection import org.opendc.simulator.flow.FlowEvent import org.opendc.simulator.flow.FlowSource -import kotlin.math.min /** * Helper class to expose an observable [rate] field describing the flow rate of the source. @@ -59,20 +58,11 @@ public class FlowSourceRateAdapter( } override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - val oldSpeed = rate - try { delegate.onEvent(conn, now, event) when (event) { FlowEvent.Converge -> rate = conn.rate - FlowEvent.Capacity -> { - // Check if the consumer interrupted the consumer and updated the resource consumption. If not, we might - // need to update the current speed. - if (oldSpeed == rate) { - rate = min(conn.capacity, rate) - } - } FlowEvent.Exit -> rate = 0.0 else -> {} } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt index f1a5cbe4..fe39eb2c 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt @@ -99,6 +99,6 @@ class FlowConsumerContextTest { context.start() context.capacity = 4200.0 - verify(exactly = 0) { consumer.onEvent(any(), any(), FlowEvent.Capacity) } + verify(exactly = 1) { consumer.onPull(any(), any(), any()) } } } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt index d125c638..7fae918a 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt @@ -197,7 +197,7 @@ internal class FlowForwarderTest { } assertEquals(3000, clock.millis()) - verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Capacity) } + verify(exactly = 1) { consumer.onPull(any(), any(), any()) } } @Test diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt index 010a985e..5d579e5d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt @@ -67,7 +67,7 @@ internal class FlowSinkTest { provider.capacity = 0.5 } assertEquals(3000, clock.millis()) - verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Capacity) } + verify(exactly = 1) { consumer.onPull(any(), any(), any()) } } @Test -- cgit v1.2.3 From a2ce07026bf3ef17326e72f395dfa2dd9d9b17be Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 30 Sep 2021 15:37:35 +0200 Subject: refactor(simulator): Create separate callbacks for remaining events This change creates separate callbacks for the remaining events: onStart, onStop and onConverge. --- .../org/opendc/simulator/flow/FlowConsumer.kt | 30 ++++++++-- .../kotlin/org/opendc/simulator/flow/FlowEvent.kt | 43 -------------- .../org/opendc/simulator/flow/FlowForwarder.kt | 65 ++++++++++++---------- .../kotlin/org/opendc/simulator/flow/FlowSource.kt | 25 +++++++-- .../flow/internal/FlowConsumerContextImpl.kt | 53 +++++++++++------- .../flow/mux/ForwardingFlowMultiplexer.kt | 9 +-- .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 30 +++++----- .../simulator/flow/source/FlowSourceRateAdapter.kt | 29 ++++------ .../simulator/flow/source/TraceFlowSource.kt | 25 ++++----- .../org/opendc/simulator/flow/FlowForwarderTest.kt | 39 +++++++------ .../org/opendc/simulator/flow/FlowSinkTest.kt | 25 +++++---- .../flow/mux/ExclusiveFlowMultiplexerTest.kt | 7 +-- 12 files changed, 186 insertions(+), 194 deletions(-) delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt index df2c4fab..4685a755 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt @@ -83,20 +83,20 @@ public interface FlowConsumer { public suspend fun FlowConsumer.consume(source: FlowSource) { return suspendCancellableCoroutine { cont -> startConsumer(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { - return try { - source.onPull(conn, now, delta) + override fun onStart(conn: FlowConnection, now: Long) { + try { + source.onStart(conn, now) } catch (cause: Throwable) { cont.resumeWithException(cause) throw cause } } - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { try { - source.onEvent(conn, now, event) + source.onStop(conn, now, delta) - if (event == FlowEvent.Exit && !cont.isCompleted) { + if (!cont.isCompleted) { cont.resume(Unit) } } catch (cause: Throwable) { @@ -105,6 +105,24 @@ public suspend fun FlowConsumer.consume(source: FlowSource) { } } + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return try { + source.onPull(conn, now, delta) + } catch (cause: Throwable) { + cont.resumeWithException(cause) + throw cause + } + } + + override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + try { + source.onConverge(conn, now, delta) + } catch (cause: Throwable) { + cont.resumeWithException(cause) + throw cause + } + } + override fun toString(): String = "SuspendingFlowSource" }) diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt deleted file mode 100644 index bb6f25b1..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEvent.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -/** - * A flow event that is communicated to a [FlowSource]. - */ -public enum class FlowEvent { - /** - * This event is emitted to the source when it has started. - */ - Start, - - /** - * This event is emitted to the source when it is stopped. - */ - Exit, - - /** - * This event is emitted to the source when the system has converged into a steady state. - */ - Converge, -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index bc01a11b..ab5b31c2 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -24,6 +24,7 @@ package org.opendc.simulator.flow import mu.KotlinLogging import org.opendc.simulator.flow.internal.FlowCountersImpl +import kotlin.math.max /** * A class that acts as a [FlowSource] and [FlowConsumer] at the same time. @@ -64,6 +65,8 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled _innerCtx?.pull() } + @JvmField var lastPull = Long.MAX_VALUE + override fun push(rate: Double) { if (delegate == null) { return @@ -82,7 +85,9 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled reset() if (hasDelegateStarted) { - delegate.onEvent(this, engine.clock.millis(), FlowEvent.Exit) + val now = engine.clock.millis() + val delta = max(0, now - lastPull) + delegate.onStop(this, now, delta) } } } @@ -134,6 +139,25 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled } } + override fun onStart(conn: FlowConnection, now: Long) { + _innerCtx = conn + } + + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + _innerCtx = null + + val delegate = delegate + if (delegate != null) { + reset() + + try { + delegate.onStop(this._ctx, now, delta) + } catch (cause: Throwable) { + logger.error(cause) { "Uncaught exception" } + } + } + } + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { val delegate = delegate @@ -141,10 +165,11 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled start() } + _ctx.lastPull = now updateCounters(conn, delta) return try { - delegate?.onPull(this._ctx, now, delta) ?: Long.MAX_VALUE + delegate?.onPull(_ctx, now, delta) ?: Long.MAX_VALUE } catch (cause: Throwable) { logger.error(cause) { "Uncaught exception" } @@ -153,32 +178,14 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled } } - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - when (event) { - FlowEvent.Start -> _innerCtx = conn - FlowEvent.Exit -> { - _innerCtx = null - - val delegate = delegate - if (delegate != null) { - reset() - - try { - delegate.onEvent(this._ctx, now, FlowEvent.Exit) - } catch (cause: Throwable) { - logger.error(cause) { "Uncaught exception" } - } - } - } - else -> - try { - delegate?.onEvent(this._ctx, now, event) - } catch (cause: Throwable) { - logger.error(cause) { "Uncaught exception" } - - _innerCtx = null - reset() - } + override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + try { + delegate?.onConverge(this._ctx, now, delta) + } catch (cause: Throwable) { + logger.error(cause) { "Uncaught exception" } + + _innerCtx = null + reset() } } @@ -189,7 +196,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled val delegate = delegate ?: return try { - delegate.onEvent(_ctx, engine.clock.millis(), FlowEvent.Start) + delegate.onStart(_ctx, engine.clock.millis()) hasDelegateStarted = true } catch (cause: Throwable) { logger.error(cause) { "Uncaught exception" } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt index 70687b4f..a4f624ef 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt @@ -29,6 +29,23 @@ package org.opendc.simulator.flow * (concurrently) for multiple [FlowConsumer]s, unless explicitly said otherwise. */ public interface FlowSource { + /** + * This method is invoked when the source is started. + * + * @param conn The connection between the source and consumer. + * @param now The virtual timestamp in milliseconds at which the provider finished. + */ + public fun onStart(conn: FlowConnection, now: Long) {} + + /** + * This method is invoked when the source is finished. + * + * @param conn The connection between the source and consumer. + * @param now The virtual timestamp in milliseconds at which the source finished. + * @param delta The virtual duration between this call and the last call to [onPull] in milliseconds. + */ + public fun onStop(conn: FlowConnection, now: Long, delta: Long) {} + /** * This method is invoked when the source is pulled. * @@ -40,11 +57,11 @@ public interface FlowSource { public fun onPull(conn: FlowConnection, now: Long, delta: Long): Long /** - * This method is invoked when an event has occurred. + * This method is invoked when the flow graph has converged into a steady-state system. * * @param conn The connection between the source and consumer. - * @param now The virtual timestamp in milliseconds at which the event is occurring. - * @param event The event that has occurred. + * @param now The virtual timestamp in milliseconds at which the system converged. + * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. */ - public fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) {} + public fun onConverge(conn: FlowConnection, now: Long, delta: Long) {} } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index a86ed6ea..c235b9ae 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -125,7 +125,7 @@ internal class FlowConsumerContextImpl( override fun start() { check(_state == State.Pending) { "Consumer is already started" } engine.batch { - source.onEvent(this, _clock.millis(), FlowEvent.Start) + source.onStart(this, _clock.millis()) _state = State.Active pull() } @@ -140,8 +140,7 @@ internal class FlowConsumerContextImpl( _state = State.Closed if (!_isUpdateActive) { val now = _clock.millis() - val delta = max(0, now - _lastPull) - doStopSource(now, delta) + doStopSource(now) // FIX: Make sure the context converges pull() @@ -210,19 +209,16 @@ internal class FlowConsumerContextImpl( _isUpdateActive = true _isImmediateUpdateScheduled = false - val lastPush = _lastPush - val pushDelta = max(0, now - lastPush) - try { // Pull the source if (1) `pull` is called or (2) the timer of the source has expired val deadline = if (_isPulled || _deadline == now) { val lastPull = _lastPull - val pullDelta = max(0, now - lastPull) + val delta = max(0, now - lastPull) _isPulled = false _lastPull = now - val duration = source.onPull(this, now, pullDelta) + val duration = source.onPull(this, now, delta) if (duration != Long.MAX_VALUE) now + duration else @@ -236,12 +232,15 @@ internal class FlowConsumerContextImpl( State.Active -> { val demand = _demand if (demand != _activeDemand) { + val lastPush = _lastPush + val delta = max(0, now - lastPush) + _lastPush = now - logic.onPush(this, now, pushDelta, demand) + logic.onPush(this, now, delta, demand) } } - State.Closed -> doStopSource(now, pushDelta) + State.Closed -> doStopSource(now) State.Pending -> throw IllegalStateException("Illegal transition to pending state") } @@ -256,7 +255,7 @@ internal class FlowConsumerContextImpl( // Schedule an update at the new deadline scheduleDelayed(now, deadline) } catch (cause: Throwable) { - doFailSource(now, pushDelta, cause) + doFailSource(now, cause) } finally { _isUpdateActive = false } @@ -293,12 +292,12 @@ internal class FlowConsumerContextImpl( try { if (_state == State.Active) { - source.onEvent(this, timestamp, FlowEvent.Converge) + source.onConverge(this, timestamp, delta) } logic.onConverge(this, timestamp, delta) } catch (cause: Throwable) { - doFailSource(timestamp, max(0, timestamp - _lastPull), cause) + doFailSource(timestamp, cause) } } @@ -307,12 +306,13 @@ internal class FlowConsumerContextImpl( /** * Stop the [FlowSource]. */ - private fun doStopSource(now: Long, delta: Long) { + private fun doStopSource(now: Long) { try { - source.onEvent(this, now, FlowEvent.Exit) - logic.onFinish(this, now, delta, null) + source.onStop(this, now, max(0, now - _lastPull)) + doFinishConsumer(now, null) } catch (cause: Throwable) { - doFailSource(now, delta, cause) + doFinishConsumer(now, cause) + return } finally { _deadline = Long.MAX_VALUE _demand = 0.0 @@ -322,9 +322,24 @@ internal class FlowConsumerContextImpl( /** * Fail the [FlowSource]. */ - private fun doFailSource(now: Long, delta: Long, cause: Throwable) { + private fun doFailSource(now: Long, cause: Throwable) { + try { + source.onStop(this, now, max(0, now - _lastPull)) + } catch (e: Throwable) { + e.addSuppressed(cause) + doFinishConsumer(now, e) + } finally { + _deadline = Long.MAX_VALUE + _demand = 0.0 + } + } + + /** + * Finish the consumer. + */ + private fun doFinishConsumer(now: Long, cause: Throwable?) { try { - logic.onFinish(this, now, delta, cause) + logic.onFinish(this, now, max(0, now - _lastPush), cause) } catch (e: Throwable) { e.addSuppressed(cause) logger.error(e) { "Uncaught exception" } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt index 811d9460..6dd9dcfb 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -88,13 +88,10 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul _availableOutputs += forwarder output.startConsumer(object : FlowSource by forwarder { - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - if (event == FlowEvent.Exit) { - // De-register the output after it has finished - _outputs -= output - } + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + _outputs -= output - forwarder.onEvent(conn, now, event) + forwarder.onStop(conn, now, delta) } }) } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index b98cf2f1..c7379fa9 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -372,6 +372,19 @@ public class MaxMinFlowMultiplexer( provider.cancel() } + override fun onStart(conn: FlowConnection, now: Long) { + assert(_ctx == null) { "Source running concurrently" } + _ctx = conn + capacity = conn.capacity + updateCapacity() + } + + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + _ctx = null + capacity = 0.0 + updateCapacity() + } + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { val capacity = capacity if (capacity != conn.capacity) { @@ -383,23 +396,6 @@ public class MaxMinFlowMultiplexer( return Long.MAX_VALUE } - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - when (event) { - FlowEvent.Start -> { - assert(_ctx == null) { "Source running concurrently" } - _ctx = conn - capacity = conn.capacity - updateCapacity() - } - FlowEvent.Exit -> { - _ctx = null - capacity = 0.0 - updateCapacity() - } - else -> {} - } - } - override fun compareTo(other: Output): Int = capacity.compareTo(other.capacity) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt index 24ae64cb..0c39523f 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt @@ -23,7 +23,6 @@ package org.opendc.simulator.flow.source import org.opendc.simulator.flow.FlowConnection -import org.opendc.simulator.flow.FlowEvent import org.opendc.simulator.flow.FlowSource /** @@ -48,28 +47,22 @@ public class FlowSourceRateAdapter( callback(0.0) } - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { - return try { - delegate.onPull(conn, now, delta) - } catch (cause: Throwable) { + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + try { + delegate.onStop(conn, now, delta) + } finally { rate = 0.0 - throw cause } } - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - try { - delegate.onEvent(conn, now, event) + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return delegate.onPull(conn, now, delta) + } - when (event) { - FlowEvent.Converge -> rate = conn.rate - FlowEvent.Exit -> rate = 0.0 - else -> {} - } - } catch (cause: Throwable) { - rate = 0.0 - throw cause - } + override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + delegate.onConverge(conn, now, delta) + + rate = conn.rate } override fun toString(): String = "FlowSourceRateAdapter[delegate=$delegate]" diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt index 4d3ae61a..ae537845 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt @@ -23,7 +23,6 @@ package org.opendc.simulator.flow.source import org.opendc.simulator.flow.FlowConnection -import org.opendc.simulator.flow.FlowEvent import org.opendc.simulator.flow.FlowSource /** @@ -33,6 +32,15 @@ public class TraceFlowSource(private val trace: Sequence) : FlowSource private var _iterator: Iterator? = null private var _nextTarget = Long.MIN_VALUE + override fun onStart(conn: FlowConnection, now: Long) { + check(_iterator == null) { "Source already running" } + _iterator = trace.iterator() + } + + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + _iterator = null + } + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { // Check whether the trace fragment was fully consumed, otherwise wait until we have done so val nextTarget = _nextTarget @@ -52,21 +60,8 @@ public class TraceFlowSource(private val trace: Sequence) : FlowSource } } - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - when (event) { - FlowEvent.Start -> { - check(_iterator == null) { "Source already running" } - _iterator = trace.iterator() - } - FlowEvent.Exit -> { - _iterator = null - } - else -> {} - } - } - /** - * A fragment of the tgrace. + * A fragment of the trace. */ public data class Fragment(val duration: Long, val usage: Double) } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt index 7fae918a..3690e681 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt @@ -22,10 +22,10 @@ package org.opendc.simulator.flow -import io.mockk.spyk -import io.mockk.verify +import io.mockk.* import kotlinx.coroutines.* import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation @@ -122,7 +122,7 @@ internal class FlowForwarderTest { forwarder.startConsumer(consumer) forwarder.cancel() - verify(exactly = 0) { consumer.onEvent(any(), any(), FlowEvent.Exit) } + verify(exactly = 0) { consumer.onStop(any(), any(), any()) } } @Test @@ -139,8 +139,8 @@ internal class FlowForwarderTest { yield() forwarder.cancel() - verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Start) } - verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Exit) } + verify(exactly = 1) { consumer.onStart(any(), any()) } + verify(exactly = 1) { consumer.onStop(any(), any(), any()) } } @Test @@ -157,8 +157,8 @@ internal class FlowForwarderTest { yield() source.cancel() - verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Start) } - verify(exactly = 1) { consumer.onEvent(any(), any(), FlowEvent.Exit) } + verify(exactly = 1) { consumer.onStart(any(), any()) } + verify(exactly = 1) { consumer.onStop(any(), any(), any()) } } @Test @@ -182,22 +182,23 @@ internal class FlowForwarderTest { } @Test + @Disabled // Due to Kotlin bug: https://github.com/mockk/mockk/issues/368 fun testAdjustCapacity() = runBlockingSimulation { val engine = FlowEngineImpl(coroutineContext, clock) val forwarder = FlowForwarder(engine) - val source = FlowSink(engine, 1.0) + val sink = FlowSink(engine, 1.0) - val consumer = spyk(FixedFlowSource(2.0, 1.0)) - source.startConsumer(forwarder) + val source = spyk(FixedFlowSource(2.0, 1.0)) + sink.startConsumer(forwarder) coroutineScope { - launch { forwarder.consume(consumer) } + launch { forwarder.consume(source) } delay(1000) - source.capacity = 0.5 + sink.capacity = 0.5 } assertEquals(3000, clock.millis()) - verify(exactly = 1) { consumer.onPull(any(), any(), any()) } + verify(exactly = 1) { source.onPull(any(), any(), any()) } } @Test @@ -259,7 +260,7 @@ internal class FlowForwarderTest { } @Test - fun testEventFailure() = runBlockingSimulation { + fun testStartFailure() = runBlockingSimulation { val engine = FlowEngineImpl(coroutineContext, clock) val forwarder = FlowForwarder(engine) val source = FlowSink(engine, 2000.0) @@ -272,7 +273,7 @@ internal class FlowForwarderTest { return Long.MAX_VALUE } - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + override fun onStart(conn: FlowConnection, now: Long) { throw IllegalStateException("Test") } }) @@ -287,7 +288,7 @@ internal class FlowForwarderTest { } @Test - fun testEventConvergeFailure() = runBlockingSimulation { + fun testConvergeFailure() = runBlockingSimulation { val engine = FlowEngineImpl(coroutineContext, clock) val forwarder = FlowForwarder(engine) val source = FlowSink(engine, 2000.0) @@ -300,10 +301,8 @@ internal class FlowForwarderTest { return Long.MAX_VALUE } - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - if (event == FlowEvent.Converge) { - throw IllegalStateException("Test") - } + override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + throw IllegalStateException("Test") } }) } catch (cause: Throwable) { diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt index 5d579e5d..70c75864 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt @@ -22,8 +22,6 @@ package org.opendc.simulator.flow -import io.mockk.every -import io.mockk.mockk import io.mockk.spyk import io.mockk.verify import kotlinx.coroutines.* @@ -67,7 +65,7 @@ internal class FlowSinkTest { provider.capacity = 0.5 } assertEquals(3000, clock.millis()) - verify(exactly = 1) { consumer.onPull(any(), any(), any()) } + verify(exactly = 3) { consumer.onPull(any(), any(), any()) } } @Test @@ -102,7 +100,7 @@ internal class FlowSinkTest { return Long.MAX_VALUE } - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { + override fun onStart(conn: FlowConnection, now: Long) { conn.pull() } } @@ -120,11 +118,8 @@ internal class FlowSinkTest { val consumer = object : FlowSource { var isFirst = true - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - when (event) { - FlowEvent.Start -> resCtx = conn - else -> {} - } + override fun onStart(conn: FlowConnection, now: Long) { + resCtx = conn } override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { @@ -154,9 +149,15 @@ internal class FlowSinkTest { val capacity = 4200.0 val provider = FlowSink(engine, capacity) - val consumer = mockk(relaxUnitFun = true) - every { consumer.onEvent(any(), any(), eq(FlowEvent.Start)) } - .throws(IllegalStateException()) + val consumer = object : FlowSource { + override fun onStart(conn: FlowConnection, now: Long) { + throw IllegalStateException("Hi") + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return Long.MAX_VALUE + } + } assertThrows { provider.consume(consumer) diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt index c8627446..3475f027 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt @@ -108,11 +108,8 @@ internal class ExclusiveFlowMultiplexerTest { val workload = object : FlowSource { var isFirst = true - override fun onEvent(conn: FlowConnection, now: Long, event: FlowEvent) { - when (event) { - FlowEvent.Start -> isFirst = true - else -> {} - } + override fun onStart(conn: FlowConnection, now: Long) { + isFirst = true } override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { -- cgit v1.2.3 From 94783ff9d8cd81275fefd5804ac99f98e2dee3a4 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 30 Sep 2021 16:00:05 +0200 Subject: fix(simulator): Fix loss computation for UPS and PDU This change fixes the loss computation for both the UPS and PDU implementation that was broken due to the new pushing mechanism. We implement a new class FlowMapper that can be used to map the flow pushed by a `FlowSource` using a user-specified method. --- .../kotlin/org/opendc/simulator/flow/FlowMapper.kt | 75 ++++++++++++++++++++++ .../flow/internal/FlowConsumerContextImpl.kt | 1 + 2 files changed, 76 insertions(+) create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt new file mode 100644 index 00000000..6867bcef --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +/** + * A [FlowConsumer] that maps the pushed flow through [transform]. + * + * @param source The source of the flow. + * @param transform The method to transform the flow. + */ +public class FlowMapper( + private val source: FlowSource, + private val transform: (FlowConnection, Double) -> Double +) : FlowSource { + + /** + * The current active connection. + */ + private var _conn: Connection? = null + + override fun onStart(conn: FlowConnection, now: Long) { + check(_conn == null) { "Concurrent access" } + val delegate = Connection(conn, transform) + _conn = delegate + source.onStart(delegate, now) + } + + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + val delegate = checkNotNull(_conn) { "Invariant violation" } + _conn = null + source.onStop(delegate, now, delta) + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + val delegate = checkNotNull(_conn) { "Invariant violation" } + return source.onPull(delegate, now, delta) + } + + override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + val delegate = _conn ?: return + source.onConverge(delegate, now, delta) + } + + /** + * The wrapper [FlowConnection] that is used to transform the flow. + */ + private class Connection( + private val delegate: FlowConnection, + private val transform: (FlowConnection, Double) -> Double + ) : FlowConnection by delegate { + override fun push(rate: Double) { + delegate.push(transform(this, rate)) + } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index c235b9ae..55fa92df 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -127,6 +127,7 @@ internal class FlowConsumerContextImpl( engine.batch { source.onStart(this, _clock.millis()) _state = State.Active + pull() } } -- cgit v1.2.3 From 559ac2327b8aa319fb8ab4558d4f4aa3382349f4 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 30 Sep 2021 16:27:45 +0200 Subject: perf(simulator): Make convergence callback optional This change adds two new properties for controlling whether the convergence callbacks of the source and consumer respectively should be invoked. This saves a lot of unnecessary calls for stages that do not have any implementation of the `onConvergence` method. --- .../org/opendc/simulator/flow/FlowConnection.kt | 5 +++ .../opendc/simulator/flow/FlowConsumerContext.kt | 5 +++ .../org/opendc/simulator/flow/FlowConsumerLogic.kt | 3 ++ .../simulator/flow/FlowConvergenceListener.kt | 36 ++++++++++++++++++ .../org/opendc/simulator/flow/FlowForwarder.kt | 10 +++++ .../kotlin/org/opendc/simulator/flow/FlowSink.kt | 11 +++++- .../kotlin/org/opendc/simulator/flow/FlowSource.kt | 3 ++ .../kotlin/org/opendc/simulator/flow/FlowSystem.kt | 43 ---------------------- .../flow/internal/FlowConsumerContextImpl.kt | 39 +++++++++++++++----- .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 9 ++++- .../simulator/flow/source/FlowSourceRateAdapter.kt | 14 +++++-- .../org/opendc/simulator/flow/FlowForwarderTest.kt | 4 ++ 12 files changed, 122 insertions(+), 60 deletions(-) create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt index fa833961..c327e1e9 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt @@ -41,6 +41,11 @@ public interface FlowConnection : AutoCloseable { */ public val demand: Double + /** + * A flag to control whether [FlowSource.onConverge] should be invoked for this source. + */ + public var shouldSourceConverge: Boolean + /** * Pull the source. */ diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt index 75b2d25b..15f9b93b 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt @@ -33,6 +33,11 @@ public interface FlowConsumerContext : FlowConnection { */ public override var capacity: Double + /** + * A flag to control whether [FlowConsumerLogic.onConverge] should be invoked for the consumer. + */ + public var shouldConsumerConverge: Boolean + /** * Start the flow over the connection. */ diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt index ef94ab22..50fbc9c7 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt @@ -39,6 +39,9 @@ public interface FlowConsumerLogic { /** * This method is invoked when the flow graph has converged into a steady-state system. * + * Make sure to enable [FlowConsumerContext.shouldSourceConverge] if you need this callback. By default, this method + * will not be invoked. + * * @param ctx The context in which the provider runs. * @param now The virtual timestamp in milliseconds at which the system converged. * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt new file mode 100644 index 00000000..d1afda6f --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow + +/** + * A listener interface for when a flow stage has converged into a steady-state. + */ +public interface FlowConvergenceListener { + /** + * This method is invoked when the system has converged to a steady-state. + * + * @param now The timestamp at which the system converged. + * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. + */ + public fun onConverge(now: Long, delta: Long) {} +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index ab5b31c2..17de601a 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -52,6 +52,12 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled * The exposed [FlowConnection]. */ private val _ctx = object : FlowConnection { + override var shouldSourceConverge: Boolean = false + set(value) { + field = value + _innerCtx?.shouldSourceConverge = value + } + override val capacity: Double get() = _innerCtx?.capacity ?: 0.0 @@ -141,6 +147,10 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled override fun onStart(conn: FlowConnection, now: Long) { _innerCtx = conn + + if (_ctx.shouldSourceConverge) { + conn.shouldSourceConverge = true + } } override fun onStop(conn: FlowConnection, now: Long, delta: Long) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt index fc590177..549a338b 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt @@ -32,9 +32,16 @@ package org.opendc.simulator.flow public class FlowSink( private val engine: FlowEngine, initialCapacity: Double, - private val parent: FlowSystem? = null + private val parent: FlowConvergenceListener? = null ) : AbstractFlowConsumer(engine, initialCapacity) { + override fun start(ctx: FlowConsumerContext) { + if (parent != null) { + ctx.shouldConsumerConverge = true + } + super.start(ctx) + } + override fun createLogic(): FlowConsumerLogic { return object : FlowConsumerLogic { override fun onPush( @@ -52,7 +59,7 @@ public class FlowSink( } override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { - parent?.onConverge(now) + parent?.onConverge(now, delta) } } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt index a4f624ef..3a7e52aa 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt @@ -59,6 +59,9 @@ public interface FlowSource { /** * This method is invoked when the flow graph has converged into a steady-state system. * + * Make sure to enable [FlowConnection.shouldSourceConverge] if you need this callback. By default, this method + * will not be invoked. + * * @param conn The connection between the source and consumer. * @param now The virtual timestamp in milliseconds at which the system converged. * @param delta The virtual duration between this call and the last call to [onConverge] in milliseconds. diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt deleted file mode 100644 index db6aa69f..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSystem.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -/** - * A system of possible multiple sub-resources. - * - * This interface is used to model hierarchies of resource providers, which can listen efficiently to changes of the - * resource provider. - */ -public interface FlowSystem { - /** - * The parent system to which this system belongs or `null` if it has no parent. - */ - public val parent: FlowSystem? - - /** - * This method is invoked when the system has converged to a steady-state. - * - * @param timestamp The timestamp at which the system converged. - */ - public fun onConverge(timestamp: Long) -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index 55fa92df..c087a28d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -72,6 +72,12 @@ internal class FlowConsumerContextImpl( override val demand: Double get() = _demand + /** + * Flags to control the convergence of the consumer and source. + */ + override var shouldConsumerConverge: Boolean = false + override var shouldSourceConverge: Boolean = false + /** * The clock to track simulation time. */ @@ -114,7 +120,8 @@ internal class FlowConsumerContextImpl( */ private var _lastPull: Long = Long.MIN_VALUE // Last call to `onPull` private var _lastPush: Long = Long.MIN_VALUE // Last call to `onPush` - private var _lastConvergence: Long = Long.MAX_VALUE // Last call to `onConvergence` + private var _lastSourceConvergence: Long = Long.MAX_VALUE // Last call to source `onConvergence` + private var _lastConsumerConvergence: Long = Long.MAX_VALUE // Last call to consumer `onConvergence` /** * The timers at which the context is scheduled to be interrupted. @@ -199,8 +206,14 @@ internal class FlowConsumerContextImpl( * @return A flag to indicate whether the connection has already been updated before convergence. */ fun doUpdate(now: Long): Boolean { - val willConverge = _willConverge - _willConverge = true + // The connection will only converge if either the source or the consumer wants the converge callback to be + // invoked. + val shouldConverge = shouldSourceConverge || shouldConsumerConverge + var willConverge = false + if (shouldConverge) { + willConverge = _willConverge + _willConverge = true + } val oldState = _state if (oldState != State.Active) { @@ -286,19 +299,25 @@ internal class FlowConsumerContextImpl( /** * This method is invoked when the system converges into a steady state. */ - fun onConverge(timestamp: Long) { - val delta = max(0, timestamp - _lastConvergence) - _lastConvergence = timestamp + fun onConverge(now: Long) { _willConverge = false try { - if (_state == State.Active) { - source.onConverge(this, timestamp, delta) + if (_state == State.Active && shouldSourceConverge) { + val delta = max(0, now - _lastSourceConvergence) + _lastSourceConvergence = now + + source.onConverge(this, now, delta) } - logic.onConverge(this, timestamp, delta) + if (shouldConsumerConverge) { + val delta = max(0, now - _lastConsumerConvergence) + _lastConsumerConvergence = now + + logic.onConverge(this, now, delta) + } } catch (cause: Throwable) { - doFailSource(timestamp, cause) + doFailSource(now, cause) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index c7379fa9..7232df35 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -38,7 +38,7 @@ import kotlin.math.min */ public class MaxMinFlowMultiplexer( private val engine: FlowEngine, - private val parent: FlowSystem? = null, + private val parent: FlowConvergenceListener? = null, private val interferenceDomain: InterferenceDomain? = null ) : FlowMultiplexer { /** @@ -269,6 +269,11 @@ public class MaxMinFlowMultiplexer( check(!_isClosed) { "Cannot re-use closed input" } _activeInputs += this + + if (parent != null) { + ctx.shouldConsumerConverge = true + } + super.start(ctx) } @@ -289,7 +294,7 @@ public class MaxMinFlowMultiplexer( } override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { - parent?.onConverge(now) + parent?.onConverge(now, delta) } override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt index 0c39523f..6dd60d95 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt @@ -47,6 +47,12 @@ public class FlowSourceRateAdapter( callback(0.0) } + override fun onStart(conn: FlowConnection, now: Long) { + conn.shouldSourceConverge = true + + delegate.onStart(conn, now) + } + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { try { delegate.onStop(conn, now, delta) @@ -60,9 +66,11 @@ public class FlowSourceRateAdapter( } override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { - delegate.onConverge(conn, now, delta) - - rate = conn.rate + try { + delegate.onConverge(conn, now, delta) + } finally { + rate = conn.rate + } } override fun toString(): String = "FlowSourceRateAdapter[delegate=$delegate]" diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt index 3690e681..d548451f 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt @@ -297,6 +297,10 @@ internal class FlowForwarderTest { try { forwarder.consume(object : FlowSource { + override fun onStart(conn: FlowConnection, now: Long) { + conn.shouldSourceConverge = true + } + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { return Long.MAX_VALUE } -- cgit v1.2.3 From de8509b50d5acb7e739eb5cb4d29b03a627c8985 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 1 Oct 2021 11:40:21 +0200 Subject: perf(simulator): Reduce field accesses in FlowConsumerContextImpl This change updates the implementation of FlowConsumerContextImpl to reduce the number of field accesses by storing the flags of the connection inside a single integer. --- .../kotlin/org/opendc/simulator/flow/FlowSink.kt | 2 + .../org/opendc/simulator/flow/internal/Flags.kt | 43 +++ .../flow/internal/FlowConsumerContextImpl.kt | 338 ++++++++++----------- .../simulator/flow/internal/FlowEngineImpl.kt | 112 ++----- 4 files changed, 241 insertions(+), 254 deletions(-) create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt index 549a338b..b4eb6a38 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt @@ -44,6 +44,8 @@ public class FlowSink( override fun createLogic(): FlowConsumerLogic { return object : FlowConsumerLogic { + private val parent = this@FlowSink.parent + override fun onPush( ctx: FlowConsumerContext, now: Long, diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt new file mode 100644 index 00000000..81ed9f26 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.internal + +/** + * States of the flow connection. + */ +internal const val ConnPending = 0 // Connection is pending and the consumer is waiting to consume the source +internal const val ConnActive = 1 // Connection is active and the source is currently being consumed +internal const val ConnClosed = 2 // Connection is closed and source cannot be consumed through this connection anymore +internal const val ConnState = 0b11 // Mask for accessing the state of the flow connection + +/** + * Flags of the flow connection + */ +internal const val ConnPulled = 1 shl 2 // The source should be pulled +internal const val ConnPushed = 1 shl 3 // The source has pushed a value +internal const val ConnUpdateActive = 1 shl 4 // An update for the connection is active +internal const val ConnUpdatePending = 1 shl 5 // An (immediate) update of the connection is pending +internal const val ConnUpdateSkipped = 1 shl 6 // An update of the connection was not necessary +internal const val ConnConvergePending = 1 shl 7 // Indication that a convergence is already pending +internal const val ConnConvergeSource = 1 shl 8 // Enable convergence of the source +internal const val ConnConvergeConsumer = 1 shl 9 // Enable convergence of the consumer diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index c087a28d..9d36483e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -24,7 +24,7 @@ package org.opendc.simulator.flow.internal import mu.KotlinLogging import org.opendc.simulator.flow.* -import java.util.ArrayDeque +import java.util.* import kotlin.math.max import kotlin.math.min @@ -44,20 +44,18 @@ internal class FlowConsumerContextImpl( /** * The capacity of the connection. */ - override var capacity: Double = 0.0 + override var capacity: Double + get() = _capacity set(value) { - val oldValue = field + val oldValue = _capacity // Only changes will be propagated if (value != oldValue) { - field = value - - // Do not pull the source if it has not been started yet - if (_state == State.Active) { - pull() - } + _capacity = value + pull() } } + private var _capacity: Double = 0.0 /** * The current processing rate of the connection. @@ -75,45 +73,41 @@ internal class FlowConsumerContextImpl( /** * Flags to control the convergence of the consumer and source. */ - override var shouldConsumerConverge: Boolean = false override var shouldSourceConverge: Boolean = false + set(value) { + field = value + _flags = + if (value) + _flags or ConnConvergeSource + else + _flags and ConnConvergeSource.inv() + } + override var shouldConsumerConverge: Boolean = false + set(value) { + field = value + + _flags = + if (value) + _flags or ConnConvergeConsumer + else + _flags and ConnConvergeConsumer.inv() + } /** * The clock to track simulation time. */ private val _clock = engine.clock - /** - * A flag to indicate the state of the connection. - */ - private var _state = State.Pending - /** * The current state of the connection. */ private var _demand: Double = 0.0 // The current (pending) demand of the source - private var _activeDemand: Double = 0.0 // The previous demand of the source private var _deadline: Long = Long.MAX_VALUE // The deadline of the source's timer /** - * A flag to indicate that the source should be pulled. - */ - private var _isPulled = false - - /** - * A flag to indicate that an update is active. - */ - private var _isUpdateActive = false - - /** - * A flag to indicate that an immediate update is scheduled. + * The flags of the flow connection, indicating certain actions. */ - private var _isImmediateUpdateScheduled = false - - /** - * A flag that indicates to the [FlowEngine] that the context is already enqueued to converge. - */ - private var _willConverge: Boolean = false + private var _flags: Int = 0 /** * The timestamp of calls to the callbacks. @@ -130,44 +124,53 @@ internal class FlowConsumerContextImpl( private val _pendingTimers: ArrayDeque = ArrayDeque(5) override fun start() { - check(_state == State.Pending) { "Consumer is already started" } + check(_flags and ConnState == ConnPending) { "Consumer is already started" } engine.batch { - source.onStart(this, _clock.millis()) - _state = State.Active + val now = _clock.millis() + source.onStart(this, now) - pull() + // Mark the connection as active and pulled + val newFlags = (_flags and ConnState.inv()) or ConnActive or ConnPulled + scheduleImmediate(now, newFlags) } } override fun close() { - if (_state == State.Closed) { + var flags = _flags + if (flags and ConnState == ConnClosed) { return } engine.batch { - _state = State.Closed - if (!_isUpdateActive) { + // Mark the connection as closed and pulled (in order to converge) + flags = (flags and ConnState.inv()) or ConnClosed or ConnPulled + _flags = flags + + if (flags and ConnUpdateActive == 0) { val now = _clock.millis() doStopSource(now) // FIX: Make sure the context converges - pull() + scheduleImmediate(now, flags) } } } override fun pull() { - if (_state == State.Closed) { + val flags = _flags + if (flags and ConnState != ConnActive) { return } - _isPulled = true - scheduleImmediate() + // Mark connection as pulled + scheduleImmediate(_clock.millis(), flags or ConnPulled) } override fun flush() { + val flags = _flags + // Do not attempt to flush the connection if the connection is closed or an update is already active - if (_state == State.Closed || _isUpdateActive) { + if (flags and ConnState != ConnActive || flags and ConnUpdateActive != 0) { return } @@ -181,118 +184,145 @@ internal class FlowConsumerContextImpl( _demand = rate - // Invalidate only if the active demand is changed and no update is active - // If an update is active, it will already get picked up at the end of the update - if (_activeDemand != rate && !_isUpdateActive) { - scheduleImmediate() - } - } + val flags = _flags - /** - * Determine whether the state of the flow connection should be updated. - */ - fun shouldUpdate(timestamp: Long): Boolean { - // The flow connection should be updated for three reasons: - // (1) The source should be pulled (after a call to `pull`) - // (2) The demand of the source has changed (after a call to `push`) - // (3) The timer of the source expired - return _isPulled || _demand != _activeDemand || _deadline == timestamp + if (flags and ConnUpdateActive != 0) { + // If an update is active, it will already get picked up at the end of the update + _flags = flags or ConnPushed + } else { + // Invalidate only if no update is active + scheduleImmediate(_clock.millis(), flags or ConnPushed) + } } /** * Update the state of the flow connection. * * @param now The current virtual timestamp. - * @return A flag to indicate whether the connection has already been updated before convergence. + * @param visited The queue of connections that have been visited during the cycle. + * @param timerQueue The queue of all pending timers. + * @param isImmediate A flag to indicate that this invocation is an immediate update or a delayed update. */ - fun doUpdate(now: Long): Boolean { - // The connection will only converge if either the source or the consumer wants the converge callback to be - // invoked. - val shouldConverge = shouldSourceConverge || shouldConsumerConverge - var willConverge = false - if (shouldConverge) { - willConverge = _willConverge - _willConverge = true - } - - val oldState = _state - if (oldState != State.Active) { - return willConverge + fun doUpdate( + now: Long, + visited: ArrayDeque, + timerQueue: PriorityQueue, + isImmediate: Boolean + ) { + var flags = _flags + + // Precondition: The flow connection must be active + if (flags and ConnState != ConnActive) { + return } - _isUpdateActive = true - _isImmediateUpdateScheduled = false + val deadline = _deadline + val reachedDeadline = deadline == now + var newDeadline = deadline + var hasUpdated = false try { // Pull the source if (1) `pull` is called or (2) the timer of the source has expired - val deadline = if (_isPulled || _deadline == now) { + newDeadline = if (flags and ConnPulled != 0 || reachedDeadline) { val lastPull = _lastPull val delta = max(0, now - lastPull) - _isPulled = false + // Update state before calling into the outside world, so it observes a consistent state _lastPull = now + _flags = (flags and ConnPulled.inv()) or ConnUpdateActive + hasUpdated = true val duration = source.onPull(this, now, delta) + + // IMPORTANT: Re-fetch the flags after the callback might have changed those + flags = _flags + if (duration != Long.MAX_VALUE) now + duration else duration } else { - _deadline + deadline } - // Check whether the state has changed after [consumer.onNext] - when (_state) { - State.Active -> { - val demand = _demand - if (demand != _activeDemand) { - val lastPush = _lastPush - val delta = max(0, now - lastPush) - - _lastPush = now - - logic.onPush(this, now, delta, demand) - } - } - State.Closed -> doStopSource(now) - State.Pending -> throw IllegalStateException("Illegal transition to pending state") - } + // Push to the consumer if the rate of the source has changed (after a call to `push`) + val newState = flags and ConnState + if (newState == ConnActive && flags and ConnPushed != 0) { + val lastPush = _lastPush + val delta = max(0, now - lastPush) - // Note: pending limit might be changed by [logic.onConsume], so re-fetch the value - val newLimit = _demand + // Update state before calling into the outside world, so it observes a consistent state + _lastPush = now + _flags = (flags and ConnPushed.inv()) or ConnUpdateActive + hasUpdated = true - // Flush the changes to the flow - _activeDemand = newLimit - _deadline = deadline - _rate = min(capacity, newLimit) + logic.onPush(this, now, delta, _demand) - // Schedule an update at the new deadline - scheduleDelayed(now, deadline) + // IMPORTANT: Re-fetch the flags after the callback might have changed those + flags = _flags + } else if (newState == ConnClosed) { + hasUpdated = true + + // The source has called [FlowConnection.close], so clean up the connection + doStopSource(now) + } } catch (cause: Throwable) { + // Mark the connection as closed + flags = (flags and ConnState.inv()) or ConnClosed + doFailSource(now, cause) - } finally { - _isUpdateActive = false } - return willConverge - } + // Check whether the connection needs to be added to the visited queue. This is the case when: + // (1) An update was performed (either a push or a pull) + // (2) Either the source or consumer want to converge, and + // (3) Convergence is not already pending (ConnConvergePending) + if (hasUpdated && flags and (ConnConvergeSource or ConnConvergeConsumer) != 0 && flags and ConnConvergePending == 0) { + visited.add(this) + flags = flags or ConnConvergePending + } - /** - * Prune the elapsed timers from this context. - */ - fun updateTimers() { - // Invariant: Any pending timer should only point to a future timestamp - // See also `scheduleDelayed` - _timer = _pendingTimers.poll() - } + // Compute the new flow rate of the connection + // Note: _demand might be changed by [logic.onConsume], so we must re-fetch the value + _rate = min(_capacity, _demand) - /** - * Try to re-schedule the resource context in case it was skipped. - */ - fun tryReschedule(now: Long) { - val deadline = _deadline - if (deadline > now && deadline != Long.MAX_VALUE) { - scheduleDelayed(now, deadline) + // Indicate that no update is active anymore and flush the flags + _flags = flags and ConnUpdateActive.inv() and ConnUpdatePending.inv() + + val pendingTimers = _pendingTimers + + // Prune the head timer if this is a delayed update + val timer = if (!isImmediate) { + // Invariant: Any pending timer should only point to a future timestamp + // See also `scheduleDelayed` + val timer = pendingTimers.poll() + _timer = timer + timer + } else { + _timer + } + + // Set the new deadline and schedule a delayed update for that deadline + _deadline = newDeadline + + // Check whether we need to schedule a new timer for this connection. That is the case when: + // (1) The deadline is valid (not the maximum value) + // (2) The connection is active + // (3) The current active timer for the connection points to a later deadline + if (newDeadline == Long.MAX_VALUE || flags and ConnState != ConnActive || (timer != null && newDeadline >= timer.target)) { + // Ignore any deadline scheduled at the maximum value + // This indicates that the source does not want to register a timer + return + } + + // Construct a timer for the new deadline and add it to the global queue of timers + val newTimer = FlowEngineImpl.Timer(this, newDeadline) + _timer = newTimer + timerQueue.add(newTimer) + + // A timer already exists for this connection, so add it to the queue of pending timers + if (timer != null) { + pendingTimers.addFirst(timer) } } @@ -300,17 +330,22 @@ internal class FlowConsumerContextImpl( * This method is invoked when the system converges into a steady state. */ fun onConverge(now: Long) { - _willConverge = false - try { - if (_state == State.Active && shouldSourceConverge) { + val flags = _flags + + // The connection is converging now, so unset the convergence pending flag + _flags = flags and ConnConvergePending.inv() + + // Call the source converge callback if it has enabled convergence and the connection is active + if (flags and ConnState == ConnActive && flags and ConnConvergeSource != 0) { val delta = max(0, now - _lastSourceConvergence) _lastSourceConvergence = now source.onConverge(this, now, delta) } - if (shouldConsumerConverge) { + // Call the consumer callback if it has enabled convergence + if (flags and ConnConvergeConsumer != 0) { val delta = max(0, now - _lastConsumerConvergence) _lastConsumerConvergence = now @@ -369,57 +404,16 @@ internal class FlowConsumerContextImpl( /** * Schedule an immediate update for this connection. */ - private fun scheduleImmediate() { + private fun scheduleImmediate(now: Long, flags: Int) { // In case an immediate update is already scheduled, no need to do anything - if (_isImmediateUpdateScheduled) { + if (flags and ConnUpdatePending != 0) { + _flags = flags return } - _isImmediateUpdateScheduled = true + // Mark the connection that there is an update pending + _flags = flags or ConnUpdatePending - val now = _clock.millis() engine.scheduleImmediate(now, this) } - - /** - * Schedule a delayed update for this resource context. - */ - private fun scheduleDelayed(now: Long, target: Long) { - // Ignore any target scheduled at the maximum value - // This indicates that the sources does not want to register a timer - if (target == Long.MAX_VALUE) { - return - } - - val timer = _timer - - if (timer == null) { - // No existing timer exists, so schedule a new timer and update the head - _timer = engine.scheduleDelayed(now, this, target) - } else if (target < timer.target) { - // Existing timer is further in the future, so schedule a new timer ahead of it - _timer = engine.scheduleDelayed(now, this, target) - _pendingTimers.addFirst(timer) - } - } - - /** - * The state of a flow connection. - */ - private enum class State { - /** - * The connection is pending and the consumer is waiting to consume the source. - */ - Pending, - - /** - * The connection is active and the source is currently being consumed. - */ - Active, - - /** - * The connection is closed and the source cannot be consumed through this connection anymore. - */ - Closed - } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt index c8170a43..019b5f10 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt @@ -63,39 +63,26 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va /** * The systems that have been visited during the engine cycle. */ - private val visited = ArrayDeque() + private val visited: ArrayDeque = ArrayDeque() /** * The index in the batch stack. */ private var batchIndex = 0 - /** - * A flag to indicate that the engine is currently active. - */ - private val isRunning: Boolean - get() = batchIndex > 0 - /** * Update the specified [ctx] synchronously. */ fun scheduleSync(now: Long, ctx: FlowConsumerContextImpl) { - if (!ctx.doUpdate(now)) { - visited.add(ctx) - } + ctx.doUpdate(now, visited, futureQueue, isImmediate = true) // In-case the engine is already running in the call-stack, return immediately. The changes will be picked // up by the active engine. - if (isRunning) { + if (batchIndex > 0) { return } - try { - batchIndex++ - runEngine(now) - } finally { - batchIndex-- - } + runEngine(now) } /** @@ -109,36 +96,11 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va // In-case the engine is already running in the call-stack, return immediately. The changes will be picked // up by the active engine. - if (isRunning) { + if (batchIndex > 0) { return } - try { - batchIndex++ - runEngine(now) - } finally { - batchIndex-- - } - } - - /** - * Schedule the engine to run at [target] to update the flow contexts. - * - * This method will override earlier calls to this method for the same [ctx]. - * - * @param now The current virtual timestamp. - * @param ctx The flow context to which the event applies. - * @param target The timestamp when the interrupt should happen. - */ - fun scheduleDelayed(now: Long, ctx: FlowConsumerContextImpl, target: Long): Timer { - val futureQueue = futureQueue - - require(target >= now) { "Timestamp must be in the future" } - - val timer = Timer(ctx, target) - futureQueue.add(timer) - - return timer + runEngine(now) } override fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext = FlowConsumerContextImpl(this, consumer, provider) @@ -149,9 +111,9 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va override fun popBatch() { try { - // Flush the work if the platform is not already running + // Flush the work if the engine is not already running if (batchIndex == 1 && queue.isNotEmpty()) { - runEngine(clock.millis()) + doRunEngine(clock.millis()) } } finally { batchIndex-- @@ -159,9 +121,21 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va } /** - * Run all the enqueued actions for the specified [timestamp][now]. + * Run the engine and mark as active while running. */ private fun runEngine(now: Long) { + try { + batchIndex++ + doRunEngine(now) + } finally { + batchIndex-- + } + } + + /** + * Run all the enqueued actions for the specified [timestamp][now]. + */ + private fun doRunEngine(now: Long) { val queue = queue val futureQueue = futureQueue val futureInvocations = futureInvocations @@ -179,27 +153,16 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va // Execute all scheduled updates at current timestamp while (true) { val timer = futureQueue.peek() ?: break - val ctx = timer.ctx val target = timer.target - assert(target >= now) { "Internal inconsistency: found update of the past" } - if (target > now) { break } - futureQueue.poll() - - // Update the existing timers of the connection - ctx.updateTimers() + assert(target >= now) { "Internal inconsistency: found update of the past" } - if (ctx.shouldUpdate(now)) { - if (!ctx.doUpdate(now)) { - visited.add(ctx) - } - } else { - ctx.tryReschedule(now) - } + futureQueue.poll() + timer.ctx.doUpdate(now, visited, futureQueue, isImmediate = false) } // Repeat execution of all immediate updates until the system has converged to a steady-state @@ -208,17 +171,13 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va // Execute all immediate updates while (true) { val ctx = queue.poll() ?: break - - if (ctx.shouldUpdate(now) && !ctx.doUpdate(now)) { - visited.add(ctx) - } + ctx.doUpdate(now, visited, futureQueue, isImmediate = true) } - for (system in visited) { - system.onConverge(now) + while (true) { + val ctx = visited.poll() ?: break + ctx.onConverge(now) } - - visited.clear() } while (queue.isNotEmpty()) // Schedule an engine invocation for the next update to occur. @@ -242,18 +201,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va // Case 2: A new timer was registered ahead of the other timers. // Solution: Schedule a new scheduler invocation @OptIn(InternalCoroutinesApi::class) - val handle = delay.invokeOnTimeout( - target - now, - { - try { - batchIndex++ - runEngine(target) - } finally { - batchIndex-- - } - }, - context - ) + val handle = delay.invokeOnTimeout(target - now, { runEngine(target) }, context) scheduled.addFirst(Invocation(target, handle)) break } else if (invocation.timestamp < target) { @@ -274,7 +222,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va * This class is used to keep track of the future engine invocations created using the [Delay] instance. In case * the invocation is not needed anymore, it can be cancelled via [cancel]. */ - private data class Invocation( + private class Invocation( @JvmField val timestamp: Long, @JvmField val handle: DisposableHandle ) { -- cgit v1.2.3 From 081221684fb826ab5a00c1d8cc5a9886b9e2203c Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 1 Oct 2021 22:04:35 +0200 Subject: feat(simulator): Expose CPU time counters directly on hypervisor This change adds a new interface to the SimHypervisor interface that exposes the CPU time counters directly. These are derived from the flow counters and will be used by SimHost to expose them via telemetry. --- .../org/opendc/simulator/flow/FlowBenchmarks.kt | 16 +- .../opendc/simulator/flow/AbstractFlowConsumer.kt | 20 ++- .../org/opendc/simulator/flow/FlowCounters.kt | 4 +- .../org/opendc/simulator/flow/FlowForwarder.kt | 3 +- .../simulator/flow/internal/FlowCountersImpl.kt | 6 +- .../opendc/simulator/flow/mux/FlowMultiplexer.kt | 38 ++++- .../flow/mux/ForwardingFlowMultiplexer.kt | 114 ++++++++------ .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 164 ++++++++++++++++----- .../org/opendc/simulator/flow/FlowForwarderTest.kt | 2 +- .../flow/mux/ExclusiveFlowMultiplexerTest.kt | 154 ------------------- .../flow/mux/ForwardingFlowMultiplexerTest.kt | 154 +++++++++++++++++++ .../flow/mux/MaxMinFlowMultiplexerTest.kt | 12 +- 12 files changed, 419 insertions(+), 268 deletions(-) delete mode 100644 opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt index 4834f10f..e927f81d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt @@ -83,8 +83,8 @@ class FlowBenchmarks { return scope.runBlockingSimulation { val switch = MaxMinFlowMultiplexer(engine) - switch.addOutput(FlowSink(engine, 3000.0)) - switch.addOutput(FlowSink(engine, 3000.0)) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) val provider = switch.newInput() return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) @@ -96,8 +96,8 @@ class FlowBenchmarks { return scope.runBlockingSimulation { val switch = MaxMinFlowMultiplexer(engine) - switch.addOutput(FlowSink(engine, 3000.0)) - switch.addOutput(FlowSink(engine, 3000.0)) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) repeat(3) { launch { @@ -113,8 +113,8 @@ class FlowBenchmarks { return scope.runBlockingSimulation { val switch = ForwardingFlowMultiplexer(engine) - switch.addOutput(FlowSink(engine, 3000.0)) - switch.addOutput(FlowSink(engine, 3000.0)) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) val provider = switch.newInput() return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) @@ -126,8 +126,8 @@ class FlowBenchmarks { return scope.runBlockingSimulation { val switch = ForwardingFlowMultiplexer(engine) - switch.addOutput(FlowSink(engine, 3000.0)) - switch.addOutput(FlowSink(engine, 3000.0)) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) + FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) repeat(2) { launch { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt index c8092082..b02426e3 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt @@ -83,14 +83,18 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi /** * The previous demand for the consumer. */ - private var previousDemand = 0.0 + private var _previousDemand = 0.0 + private var _previousCapacity = 0.0 /** * Update the counters of the flow consumer. */ protected fun updateCounters(ctx: FlowConnection, delta: Long) { - val demand = previousDemand - previousDemand = ctx.demand + val demand = _previousDemand + val capacity = _previousCapacity + + _previousDemand = ctx.demand + _previousCapacity = ctx.capacity if (delta <= 0) { return @@ -98,23 +102,23 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi val counters = _counters val deltaS = delta / 1000.0 - val work = demand * deltaS + val total = demand * deltaS + val work = capacity * deltaS val actualWork = ctx.rate * deltaS - val remainingWork = work - actualWork counters.demand += work counters.actual += actualWork - counters.overcommit += remainingWork + counters.remaining += (total - actualWork) } /** * Update the counters of the flow consumer. */ - protected fun updateCounters(demand: Double, actual: Double, overcommit: Double) { + protected fun updateCounters(demand: Double, actual: Double, remaining: Double) { val counters = _counters counters.demand += demand counters.actual += actual - counters.overcommit += overcommit + counters.remaining += remaining } final override fun startConsumer(source: FlowSource) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt index e15d7643..a717ae6e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt @@ -37,9 +37,9 @@ public interface FlowCounters { public val actual: Double /** - * The accumulated flow that could not be transferred over the connection. + * The amount of capacity that was not utilized. */ - public val overcommit: Double + public val remaining: Double /** * The accumulated flow lost due to interference between sources. diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index 17de601a..7eaaf6c2 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -242,10 +242,11 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled val counters = _counters val deltaS = delta / 1000.0 + val total = ctx.capacity * deltaS val work = _demand * deltaS val actualWork = ctx.rate * deltaS counters.demand += work counters.actual += actualWork - counters.overcommit += (work - actualWork) + counters.remaining += (total - actualWork) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt index 141d335d..d2fa5228 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt @@ -30,17 +30,17 @@ import org.opendc.simulator.flow.FlowCounters internal class FlowCountersImpl : FlowCounters { override var demand: Double = 0.0 override var actual: Double = 0.0 - override var overcommit: Double = 0.0 + override var remaining: Double = 0.0 override var interference: Double = 0.0 override fun reset() { demand = 0.0 actual = 0.0 - overcommit = 0.0 + remaining = 0.0 interference = 0.0 } override fun toString(): String { - return "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit,interference=$interference]" + return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining,interference=$interference]" } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt index 17b82391..04ba7f21 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt @@ -39,7 +39,22 @@ public interface FlowMultiplexer { /** * The outputs of the multiplexer over which the flows will be distributed. */ - public val outputs: Set + public val outputs: Set + + /** + * The actual processing rate of the multiplexer. + */ + public val rate: Double + + /** + * The demanded processing rate of the input. + */ + public val demand: Double + + /** + * The capacity of the outputs. + */ + public val capacity: Double /** * The flow counters to track the flow metrics of all multiplexer inputs. @@ -59,12 +74,27 @@ public interface FlowMultiplexer { public fun removeInput(input: FlowConsumer) /** - * Add the specified [output] to the multiplexer. + * Create a new output on this multiplexer. */ - public fun addOutput(output: FlowConsumer) + public fun newOutput(): FlowSource /** - * Clear all inputs and outputs from the switch. + * Remove [output] from this multiplexer. + */ + public fun removeOutput(output: FlowSource) + + /** + * Clear all inputs and outputs from the multiplexer. */ public fun clear() + + /** + * Clear the inputs of the multiplexer. + */ + public fun clearInputs() + + /** + * Clear the outputs of the multiplexer. + */ + public fun clearOutputs() } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt index 6dd9dcfb..125d10fe 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -38,35 +38,44 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul get() = _inputs private val _inputs = mutableSetOf() - override val outputs: Set + override val outputs: Set get() = _outputs - private val _outputs = mutableSetOf() - private val _availableOutputs = ArrayDeque() + private val _outputs = mutableSetOf() + private val _availableOutputs = ArrayDeque() override val counters: FlowCounters = object : FlowCounters { override val demand: Double - get() = _outputs.sumOf { it.counters.demand } + get() = _outputs.sumOf { it.forwarder.counters.demand } override val actual: Double - get() = _outputs.sumOf { it.counters.actual } - override val overcommit: Double - get() = _outputs.sumOf { it.counters.overcommit } + get() = _outputs.sumOf { it.forwarder.counters.actual } + override val remaining: Double + get() = _outputs.sumOf { it.forwarder.counters.remaining } override val interference: Double - get() = _outputs.sumOf { it.counters.interference } + get() = _outputs.sumOf { it.forwarder.counters.interference } override fun reset() { - for (input in _outputs) { - input.counters.reset() + for (output in _outputs) { + output.forwarder.counters.reset() } } - override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,overcommit=$overcommit]" + override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining]" } + override val rate: Double + get() = _outputs.sumOf { it.forwarder.rate } + + override val demand: Double + get() = _outputs.sumOf { it.forwarder.demand } + + override val capacity: Double + get() = _outputs.sumOf { it.forwarder.capacity } + override fun newInput(key: InterferenceKey?): FlowConsumer { - val forwarder = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" } - val output = Input(forwarder) - _inputs += output - return output + val output = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" } + val input = Input(output) + _inputs += input + return input } override fun removeInput(input: FlowConsumer) { @@ -74,51 +83,72 @@ public class ForwardingFlowMultiplexer(private val engine: FlowEngine) : FlowMul return } - (input as Input).close() + val output = (input as Input).output + output.forwarder.cancel() + _availableOutputs += output } - override fun addOutput(output: FlowConsumer) { - if (output in outputs) { - return - } - + override fun newOutput(): FlowSource { val forwarder = FlowForwarder(engine) + val output = Output(forwarder) _outputs += output - _availableOutputs += forwarder + return output + } - output.startConsumer(object : FlowSource by forwarder { - override fun onStop(conn: FlowConnection, now: Long, delta: Long) { - _outputs -= output + override fun removeOutput(output: FlowSource) { + if (!_outputs.remove(output)) { + return + } - forwarder.onStop(conn, now, delta) - } - }) + val forwarder = (output as Output).forwarder + forwarder.close() } - override fun clear() { - for (input in _outputs) { - input.cancel() + override fun clearInputs() { + for (input in _inputs) { + val output = input.output + output.forwarder.cancel() + _availableOutputs += output } - _outputs.clear() - // Inputs are implicitly cancelled by the output forwarders _inputs.clear() } + override fun clearOutputs() { + for (output in _outputs) { + output.forwarder.cancel() + } + _outputs.clear() + _availableOutputs.clear() + } + + override fun clear() { + clearOutputs() + clearInputs() + } + /** * An input on the multiplexer. */ - private inner class Input(private val forwarder: FlowForwarder) : FlowConsumer by forwarder { - /** - * Close the input. - */ - fun close() { - // We explicitly do not close the forwarder here in order to re-use it across input resources. - _inputs -= this - _availableOutputs += forwarder + private inner class Input(@JvmField val output: Output) : FlowConsumer by output.forwarder { + override fun toString(): String = "ForwardingFlowMultiplexer.Input" + } + + /** + * An output on the multiplexer. + */ + private inner class Output(@JvmField val forwarder: FlowForwarder) : FlowSource by forwarder { + override fun onStart(conn: FlowConnection, now: Long) { + _availableOutputs += this + forwarder.onStart(conn, now) } - override fun toString(): String = "ForwardingFlowMultiplexer.Input" + override fun onStop(conn: FlowConnection, now: Long, delta: Long) { + forwarder.cancel() + forwarder.onStop(conn, now, delta) + } + + override fun toString(): String = "ForwardingFlowMultiplexer.Output" } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 7232df35..5ff0fb8d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -52,9 +52,9 @@ public class MaxMinFlowMultiplexer( /** * The outputs of the multiplexer. */ - override val outputs: Set + override val outputs: Set get() = _outputs - private val _outputs = mutableSetOf() + private val _outputs = mutableSetOf() private val _activeOutputs = mutableListOf() /** @@ -67,22 +67,35 @@ public class MaxMinFlowMultiplexer( /** * The actual processing rate of the multiplexer. */ + public override val rate: Double + get() = _rate private var _rate = 0.0 /** * The demanded processing rate of the input. */ + public override val demand: Double + get() = _demand private var _demand = 0.0 /** * The capacity of the outputs. */ + public override val capacity: Double + get() = _capacity private var _capacity = 0.0 /** * Flag to indicate that the scheduler is active. */ private var _schedulerActive = false + private var _lastSchedulerCycle = Long.MAX_VALUE + + /** + * The last convergence timestamp and the input. + */ + private var _lastConverge: Long = Long.MIN_VALUE + private var _lastConvergeInput: Input? = null override fun newInput(key: InterferenceKey?): FlowConsumer { val provider = Input(_capacity, key) @@ -90,14 +103,6 @@ public class MaxMinFlowMultiplexer( return provider } - override fun addOutput(output: FlowConsumer) { - val consumer = Output(output) - if (_outputs.add(output)) { - _activeOutputs.add(consumer) - output.startConsumer(consumer) - } - } - override fun removeInput(input: FlowConsumer) { if (!_inputs.remove(input)) { return @@ -106,16 +111,38 @@ public class MaxMinFlowMultiplexer( (input as Input).close() } - override fun clear() { - for (input in _activeOutputs) { + override fun newOutput(): FlowSource { + val output = Output() + _outputs.add(output) + return output + } + + override fun removeOutput(output: FlowSource) { + if (!_outputs.remove(output)) { + return + } + + // This cast should always succeed since only `Output` instances should be added to `_outputs` + (output as Output).cancel() + } + + override fun clearInputs() { + for (input in _inputs) { input.cancel() } - _activeOutputs.clear() + _inputs.clear() + } - for (output in _activeInputs) { + override fun clearOutputs() { + for (output in _outputs) { output.cancel() } - _activeInputs.clear() + _outputs.clear() + } + + override fun clear() { + clearOutputs() + clearInputs() } /** @@ -125,10 +152,13 @@ public class MaxMinFlowMultiplexer( if (_schedulerActive) { return } - + val lastSchedulerCycle = _lastSchedulerCycle + val delta = max(0, now - lastSchedulerCycle) _schedulerActive = true + _lastSchedulerCycle = now + try { - doSchedule(now) + doSchedule(now, delta) } finally { _schedulerActive = false } @@ -137,12 +167,17 @@ public class MaxMinFlowMultiplexer( /** * Schedule the inputs over the outputs. */ - private fun doSchedule(now: Long) { + private fun doSchedule(now: Long, delta: Long) { val activeInputs = _activeInputs val activeOutputs = _activeOutputs + // Update the counters of the scheduler + updateCounters(delta) + // If there is no work yet, mark the inputs as idle. if (activeInputs.isEmpty()) { + _demand = 0.0 + _rate = 0.0 return } @@ -156,6 +191,7 @@ public class MaxMinFlowMultiplexer( // Remove outputs that have finished if (!input.isActive) { + input.actualRate = 0.0 inputIterator.remove() } } @@ -168,7 +204,8 @@ public class MaxMinFlowMultiplexer( // Divide the available output capacity fairly over the inputs using max-min fair sharing var remaining = activeInputs.size - for (input in activeInputs) { + for (i in activeInputs.indices) { + val input = activeInputs[i] val availableShare = availableCapacity / remaining-- val grantedRate = min(input.allowedRate, availableShare) @@ -192,7 +229,8 @@ public class MaxMinFlowMultiplexer( activeOutputs.sort() // Divide the requests over the available capacity of the input resources fairly - for (output in activeOutputs) { + for (i in activeOutputs.indices) { + val output = activeOutputs[i] val inputCapacity = output.capacity val fraction = inputCapacity / capacity val grantedSpeed = rate * fraction @@ -219,6 +257,29 @@ public class MaxMinFlowMultiplexer( } } + /** + * The previous capacity of the multiplexer. + */ + private var _previousCapacity = 0.0 + + /** + * Update the counters of the scheduler. + */ + private fun updateCounters(delta: Long) { + val previousCapacity = _previousCapacity + _previousCapacity = _capacity + + if (delta <= 0) { + return + } + + val deltaS = delta / 1000.0 + + _counters.demand += _demand * deltaS + _counters.actual += _rate * deltaS + _counters.remaining += (previousCapacity - _rate) * deltaS + } + /** * An internal [FlowConsumer] implementation for multiplexer inputs. */ @@ -252,6 +313,11 @@ public class MaxMinFlowMultiplexer( */ private var _lastPull: Long = Long.MIN_VALUE + /** + * The interference domain this input belongs to. + */ + private val interferenceDomain = this@MaxMinFlowMultiplexer.interferenceDomain + /** * Close the input. * @@ -269,7 +335,6 @@ public class MaxMinFlowMultiplexer( check(!_isClosed) { "Cannot re-use closed input" } _activeInputs += this - if (parent != null) { ctx.shouldConsumerConverge = true } @@ -287,14 +352,22 @@ public class MaxMinFlowMultiplexer( doUpdateCounters(delta) actualRate = 0.0 - this.limit = rate + limit = rate _lastPull = now runScheduler(now) } override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { - parent?.onConverge(now, delta) + val lastConverge = _lastConverge + val parent = parent + + if (parent != null && (lastConverge < now || _lastConvergeInput == null)) { + _lastConverge = now + _lastConvergeInput = this + + parent.onConverge(now, max(0, now - lastConverge)) + } } override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { @@ -303,6 +376,14 @@ public class MaxMinFlowMultiplexer( limit = 0.0 actualRate = 0.0 _lastPull = now + + // Assign a new input responsible for handling the convergence events + if (_lastConvergeInput == this) { + _lastConvergeInput = null + } + + // Re-run scheduler to distribute new load + runScheduler(now) } /* Comparable */ @@ -328,35 +409,31 @@ public class MaxMinFlowMultiplexer( // Compute the performance penalty due to flow interference val perfScore = if (interferenceDomain != null) { - val load = _rate / capacity + val load = _rate / _capacity interferenceDomain.apply(key, load) } else { 1.0 } val deltaS = delta / 1000.0 - val work = limit * deltaS - val actualWork = actualRate * deltaS - val remainingWork = work - actualWork + val demand = limit * deltaS + val actual = actualRate * deltaS + val remaining = (capacity - actualRate) * deltaS - updateCounters(work, actualWork, remainingWork) + updateCounters(demand, actual, remaining) - val distCounters = _counters - distCounters.demand += work - distCounters.actual += actualWork - distCounters.overcommit += remainingWork - distCounters.interference += actualWork * max(0.0, 1 - perfScore) + _counters.interference += actual * max(0.0, 1 - perfScore) } } /** * An internal [FlowSource] implementation for multiplexer outputs. */ - private inner class Output(private val provider: FlowConsumer) : FlowSource, Comparable { + private inner class Output : FlowSource, Comparable { /** * The active [FlowConnection] of this source. */ - private var _ctx: FlowConnection? = null + private var _conn: FlowConnection? = null /** * The capacity of this output. @@ -367,27 +444,33 @@ public class MaxMinFlowMultiplexer( * Push the specified rate to the consumer. */ fun push(rate: Double) { - _ctx?.push(rate) + _conn?.push(rate) } /** * Cancel this output. */ fun cancel() { - provider.cancel() + _conn?.close() } override fun onStart(conn: FlowConnection, now: Long) { - assert(_ctx == null) { "Source running concurrently" } - _ctx = conn + assert(_conn == null) { "Source running concurrently" } + _conn = conn capacity = conn.capacity + _activeOutputs.add(this) + updateCapacity() } override fun onStop(conn: FlowConnection, now: Long, delta: Long) { - _ctx = null + _conn = null capacity = 0.0 + _activeOutputs.remove(this) + updateCapacity() + + runScheduler(now) } override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { @@ -397,6 +480,7 @@ public class MaxMinFlowMultiplexer( updateCapacity() } + // Re-run scheduler to distribute new load runScheduler(now) return Long.MAX_VALUE } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt index d548451f..12e72b8f 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt @@ -217,7 +217,7 @@ internal class FlowForwarderTest { assertEquals(2.0, source.counters.actual) assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } - assertEquals(source.counters.overcommit, forwarder.counters.overcommit) { "Overcommitted work" } + assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" } assertEquals(2000, clock.millis()) } diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt deleted file mode 100644 index 3475f027..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ExclusiveFlowMultiplexerTest.kt +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.mux - -import kotlinx.coroutines.yield -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.simulator.flow.* -import org.opendc.simulator.flow.internal.FlowEngineImpl -import org.opendc.simulator.flow.source.FixedFlowSource -import org.opendc.simulator.flow.source.FlowSourceRateAdapter -import org.opendc.simulator.flow.source.TraceFlowSource - -/** - * Test suite for the [ForwardingFlowMultiplexer] class. - */ -internal class ExclusiveFlowMultiplexerTest { - /** - * Test a trace workload. - */ - @Test - fun testTrace() = runBlockingSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val speed = mutableListOf() - - val duration = 5 * 60L - val workload = - TraceFlowSource( - sequenceOf( - TraceFlowSource.Fragment(duration * 1000, 28.0), - TraceFlowSource.Fragment(duration * 1000, 3500.0), - TraceFlowSource.Fragment(duration * 1000, 0.0), - TraceFlowSource.Fragment(duration * 1000, 183.0) - ), - ) - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - val forwarder = FlowForwarder(engine) - val adapter = FlowSourceRateAdapter(forwarder, speed::add) - source.startConsumer(adapter) - switch.addOutput(forwarder) - - val provider = switch.newInput() - provider.consume(workload) - yield() - - assertAll( - { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } }, - { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } } - ) - } - - /** - * Test runtime workload on hypervisor. - */ - @Test - fun testRuntimeWorkload() = runBlockingSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val duration = 5 * 60L * 1000 - val workload = FixedFlowSource(duration * 3.2, 1.0) - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - - switch.addOutput(source) - - val provider = switch.newInput() - provider.consume(workload) - yield() - - assertEquals(duration, clock.millis()) { "Took enough time" } - } - - /** - * Test two workloads running sequentially. - */ - @Test - fun testTwoWorkloads() = runBlockingSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val duration = 5 * 60L * 1000 - val workload = object : FlowSource { - var isFirst = true - - override fun onStart(conn: FlowConnection, now: Long) { - isFirst = true - } - - override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { - return if (isFirst) { - isFirst = false - conn.push(1.0) - duration - } else { - conn.close() - Long.MAX_VALUE - } - } - } - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - - switch.addOutput(source) - - val provider = switch.newInput() - provider.consume(workload) - yield() - provider.consume(workload) - assertEquals(duration * 2, clock.millis()) { "Took enough time" } - } - - /** - * Test concurrent workloads on the machine. - */ - @Test - fun testConcurrentWorkloadFails() = runBlockingSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - - switch.addOutput(source) - - switch.newInput() - assertThrows { switch.newInput() } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt new file mode 100644 index 00000000..187dacd9 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.mux + +import kotlinx.coroutines.yield +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.junit.jupiter.api.assertThrows +import org.opendc.simulator.core.runBlockingSimulation +import org.opendc.simulator.flow.* +import org.opendc.simulator.flow.internal.FlowEngineImpl +import org.opendc.simulator.flow.source.FixedFlowSource +import org.opendc.simulator.flow.source.FlowSourceRateAdapter +import org.opendc.simulator.flow.source.TraceFlowSource + +/** + * Test suite for the [ForwardingFlowMultiplexer] class. + */ +internal class ForwardingFlowMultiplexerTest { + /** + * Test a trace workload. + */ + @Test + fun testTrace() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val speed = mutableListOf() + + val duration = 5 * 60L + val workload = + TraceFlowSource( + sequenceOf( + TraceFlowSource.Fragment(duration * 1000, 28.0), + TraceFlowSource.Fragment(duration * 1000, 3500.0), + TraceFlowSource.Fragment(duration * 1000, 0.0), + TraceFlowSource.Fragment(duration * 1000, 183.0) + ), + ) + + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) + val forwarder = FlowForwarder(engine) + val adapter = FlowSourceRateAdapter(forwarder, speed::add) + source.startConsumer(adapter) + forwarder.startConsumer(switch.newOutput()) + + val provider = switch.newInput() + provider.consume(workload) + yield() + + assertAll( + { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } }, + { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } } + ) + } + + /** + * Test runtime workload on hypervisor. + */ + @Test + fun testRuntimeWorkload() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val duration = 5 * 60L * 1000 + val workload = FixedFlowSource(duration * 3.2, 1.0) + + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) + + source.startConsumer(switch.newOutput()) + + val provider = switch.newInput() + provider.consume(workload) + yield() + + assertEquals(duration, clock.millis()) { "Took enough time" } + } + + /** + * Test two workloads running sequentially. + */ + @Test + fun testTwoWorkloads() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val duration = 5 * 60L * 1000 + val workload = object : FlowSource { + var isFirst = true + + override fun onStart(conn: FlowConnection, now: Long) { + isFirst = true + } + + override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { + return if (isFirst) { + isFirst = false + conn.push(1.0) + duration + } else { + conn.close() + Long.MAX_VALUE + } + } + } + + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) + + source.startConsumer(switch.newOutput()) + + val provider = switch.newInput() + provider.consume(workload) + yield() + provider.consume(workload) + assertEquals(duration * 2, clock.millis()) { "Took enough time" } + } + + /** + * Test concurrent workloads on the machine. + */ + @Test + fun testConcurrentWorkloadFails() = runBlockingSimulation { + val engine = FlowEngineImpl(coroutineContext, clock) + + val switch = ForwardingFlowMultiplexer(engine) + val source = FlowSink(engine, 3200.0) + + source.startConsumer(switch.newOutput()) + + switch.newInput() + assertThrows { switch.newInput() } + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt index 9f6b8a2c..6e2cdb98 100644 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt +++ b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt @@ -44,7 +44,7 @@ internal class MaxMinFlowMultiplexerTest { val switch = MaxMinFlowMultiplexer(scheduler) val sources = List(2) { FlowSink(scheduler, 2000.0) } - sources.forEach { switch.addOutput(it) } + sources.forEach { it.startConsumer(switch.newOutput()) } val provider = switch.newInput() val consumer = FixedFlowSource(2000.0, 1.0) @@ -76,10 +76,11 @@ internal class MaxMinFlowMultiplexerTest { ) val switch = MaxMinFlowMultiplexer(scheduler) + val sink = FlowSink(scheduler, 3200.0) val provider = switch.newInput() try { - switch.addOutput(FlowSink(scheduler, 3200.0)) + sink.startConsumer(switch.newOutput()) provider.consume(workload) yield() } finally { @@ -89,7 +90,7 @@ internal class MaxMinFlowMultiplexerTest { assertAll( { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") }, { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") }, - { assertEquals(90000.0, switch.counters.overcommit, "Overcommitted work does not match") }, + { assertEquals(2816700.0, switch.counters.remaining, "Remaining capacity does not match") }, { assertEquals(1200000, clock.millis()) } ) } @@ -122,11 +123,12 @@ internal class MaxMinFlowMultiplexerTest { ) val switch = MaxMinFlowMultiplexer(scheduler) + val sink = FlowSink(scheduler, 3200.0) val providerA = switch.newInput() val providerB = switch.newInput() try { - switch.addOutput(FlowSink(scheduler, 3200.0)) + sink.startConsumer(switch.newOutput()) coroutineScope { launch { providerA.consume(workloadA) } @@ -140,7 +142,7 @@ internal class MaxMinFlowMultiplexerTest { assertAll( { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") }, { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") }, - { assertEquals(1020000.0, switch.counters.overcommit, "Overcommitted work does not match") }, + { assertEquals(2786400.0, switch.counters.remaining, "Remaining capacity does not match") }, { assertEquals(1200000, clock.millis()) } ) } -- cgit v1.2.3