diff options
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow')
33 files changed, 0 insertions, 4783 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt deleted file mode 100644 index 9e0a4a5e..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -import kotlinx.coroutines.launch -import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer -import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer -import org.opendc.simulator.flow.source.TraceFlowSource -import org.opendc.simulator.kotlin.runSimulation -import org.openjdk.jmh.annotations.Benchmark -import org.openjdk.jmh.annotations.Fork -import org.openjdk.jmh.annotations.Measurement -import org.openjdk.jmh.annotations.Scope -import org.openjdk.jmh.annotations.Setup -import org.openjdk.jmh.annotations.State -import org.openjdk.jmh.annotations.Warmup -import java.util.concurrent.ThreadLocalRandom -import java.util.concurrent.TimeUnit - -@State(Scope.Thread) -@Fork(1) -@Warmup(iterations = 2, time = 1, timeUnit = TimeUnit.SECONDS) -@Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) -class FlowBenchmarks { - private lateinit var trace: Sequence<TraceFlowSource.Fragment> - - @Setup - fun setUp() { - val random = ThreadLocalRandom.current() - val entries = List(1000000) { TraceFlowSource.Fragment(1000, random.nextDouble(0.0, 4500.0)) } - trace = entries.asSequence() - } - - @Benchmark - fun benchmarkSink() { - return runSimulation { - val engine = FlowEngine(coroutineContext, clock) - val provider = FlowSink(engine, 4200.0) - return@runSimulation provider.consume(TraceFlowSource(trace)) - } - } - - @Benchmark - fun benchmarkForward() { - return runSimulation { - val engine = FlowEngine(coroutineContext, clock) - val provider = FlowSink(engine, 4200.0) - val forwarder = FlowForwarder(engine) - provider.startConsumer(forwarder) - return@runSimulation forwarder.consume(TraceFlowSource(trace)) - } - } - - @Benchmark - fun benchmarkMuxMaxMinSingleSource() { - return runSimulation { - val engine = FlowEngine(coroutineContext, clock) - val switch = MaxMinFlowMultiplexer(engine) - - FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) - FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) - - val provider = switch.newInput() - return@runSimulation provider.consume(TraceFlowSource(trace)) - } - } - - @Benchmark - fun benchmarkMuxMaxMinTripleSource() { - return runSimulation { - val engine = FlowEngine(coroutineContext, clock) - val switch = MaxMinFlowMultiplexer(engine) - - FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) - FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) - - repeat(3) { - launch { - val provider = switch.newInput() - provider.consume(TraceFlowSource(trace)) - } - } - } - } - - @Benchmark - fun benchmarkMuxExclusiveSingleSource() { - return runSimulation { - val engine = FlowEngine(coroutineContext, clock) - val switch = ForwardingFlowMultiplexer(engine) - - FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) - FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) - - val provider = switch.newInput() - return@runSimulation provider.consume(TraceFlowSource(trace)) - } - } - - @Benchmark - fun benchmarkMuxExclusiveTripleSource() { - return runSimulation { - val engine = FlowEngine(coroutineContext, clock) - val switch = ForwardingFlowMultiplexer(engine) - - FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) - FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) - - repeat(2) { - launch { - val provider = switch.newInput() - provider.consume(TraceFlowSource(trace)) - } - } - } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt deleted file mode 100644 index 8ff0bc76..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -/** - * An active connection between a [FlowSource] and [FlowConsumer]. - */ -public interface FlowConnection : AutoCloseable { - /** - * The capacity of the connection. - */ - public val capacity: Double - - /** - * The flow rate over the connection. - */ - public val rate: Double - - /** - * The flow demand of the source. - */ - public val demand: Double - - /** - * A flag to control whether [FlowSource.onConverge] should be invoked for this source. - */ - public var shouldSourceConverge: Boolean - - /** - * Pull the source. - */ - public fun pull() - - /** - * Pull the source. - * - * @param now The timestamp at which the connection is pulled. - */ - public fun pull(now: Long) - - /** - * Push the given flow [rate] over this connection. - * - * @param rate The rate of the flow to push. - */ - public fun push(rate: Double) - - /** - * Disconnect the consumer from its source. - */ - public override fun close() -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt deleted file mode 100644 index a49826f4..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumer.kt +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -import kotlinx.coroutines.suspendCancellableCoroutine -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException - -/** - * A consumer of a [FlowSource]. - */ -public interface FlowConsumer { - /** - * A flag to indicate that the consumer is currently consuming a [FlowSource]. - */ - public val isActive: Boolean - - /** - * The flow capacity of this consumer. - */ - public val capacity: Double - - /** - * The current flow rate of the consumer. - */ - public val rate: Double - - /** - * The current flow demand. - */ - public val demand: Double - - /** - * The flow counters to track the flow metrics of the consumer. - */ - public val counters: FlowCounters - - /** - * Start consuming the specified [source]. - * - * @throws IllegalStateException if the consumer is already active. - */ - public fun startConsumer(source: FlowSource) - - /** - * Ask the consumer to pull its source. - * - * If the consumer is not active, this operation will be a no-op. - */ - public fun pull() - - /** - * Disconnect the consumer from its source. - * - * If the consumer is not active, this operation will be a no-op. - */ - public fun cancel() -} - -/** - * Consume the specified [source] and suspend execution until the source is fully consumed or failed. - */ -public suspend fun FlowConsumer.consume(source: FlowSource) { - return suspendCancellableCoroutine { cont -> - startConsumer(object : FlowSource { - override fun onStart(conn: FlowConnection, now: Long) { - try { - source.onStart(conn, now) - } catch (cause: Throwable) { - cont.resumeWithException(cause) - throw cause - } - } - - override fun onStop(conn: FlowConnection, now: Long) { - try { - source.onStop(conn, now) - - if (!cont.isCompleted) { - cont.resume(Unit) - } - } catch (cause: Throwable) { - cont.resumeWithException(cause) - throw cause - } - } - - override fun onPull(conn: FlowConnection, now: Long): Long { - return try { - source.onPull(conn, now) - } catch (cause: Throwable) { - cont.resumeWithException(cause) - throw cause - } - } - - override fun onConverge(conn: FlowConnection, now: Long) { - try { - source.onConverge(conn, now) - } catch (cause: Throwable) { - cont.resumeWithException(cause) - throw cause - } - } - - override fun toString(): String = "SuspendingFlowSource" - }) - - cont.invokeOnCancellation { cancel() } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt deleted file mode 100644 index 98922ab3..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -/** - * A controllable [FlowConnection]. - * - * This interface is used by [FlowConsumer]s to control the connection between it and the source. - */ -public interface FlowConsumerContext : FlowConnection { - /** - * The deadline of the source. - */ - public val deadline: Long - - /** - * The capacity of the connection. - */ - public override var capacity: Double - - /** - * A flag to control whether [FlowConsumerLogic.onConverge] should be invoked for the consumer. - */ - public var shouldConsumerConverge: Boolean - - /** - * A flag to control whether the timers for the [FlowSource] should be enabled. - */ - public var enableTimers: Boolean - - /** - * Start the flow over the connection. - */ - public fun start() - - /** - * Synchronously pull the source of the connection. - * - * @param now The timestamp at which the connection is pulled. - */ - public fun pullSync(now: Long) -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt deleted file mode 100644 index 1d3adb10..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerLogic.kt +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -/** - * A collection of callbacks associated with a [FlowConsumer]. - */ -public interface FlowConsumerLogic { - /** - * This method is invoked when a [FlowSource] changes the rate of flow to this consumer. - * - * @param ctx The context in which the provider runs. - * @param now The virtual timestamp in milliseconds at which the update is occurring. - * @param rate The requested processing rate of the source. - */ - public fun onPush(ctx: FlowConsumerContext, now: Long, rate: Double) {} - - /** - * This method is invoked when the flow graph has converged into a steady-state system. - * - * Make sure to enable [FlowConsumerContext.shouldSourceConverge] if you need this callback. By default, this method - * will not be invoked. - * - * @param ctx The context in which the provider runs. - * @param now The virtual timestamp in milliseconds at which the system converged. - */ - public fun onConverge(ctx: FlowConsumerContext, now: Long) {} - - /** - * This method is invoked when the [FlowSource] completed or failed. - * - * @param ctx The context in which the provider runs. - * @param now The virtual timestamp in milliseconds at which the provider finished. - * @param cause The cause of the failure or `null` if the source completed. - */ - public fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) {} -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt deleted file mode 100644 index 62cb10d1..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConvergenceListener.kt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -/** - * A listener interface for when a flow stage has converged into a steady-state. - */ -public interface FlowConvergenceListener { - /** - * This method is invoked when the system has converged to a steady-state. - * - * @param now The timestamp at which the system converged. - */ - public fun onConverge(now: Long) {} -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt deleted file mode 100644 index d8ad7978..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -/** - * An interface that tracks cumulative counts of the flow accumulation over a stage. - */ -public interface FlowCounters { - /** - * The accumulated flow that a source wanted to push over the connection. - */ - public val demand: Double - - /** - * The accumulated flow that was actually transferred over the connection. - */ - public val actual: Double - - /** - * The amount of capacity that was not utilized. - */ - public val remaining: Double - - /** - * Reset the flow counters. - */ - public fun reset() -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt deleted file mode 100644 index 65224827..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowEngine.kt +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -import org.opendc.simulator.flow.internal.FlowEngineImpl -import java.time.Clock -import kotlin.coroutines.CoroutineContext - -/** - * A [FlowEngine] is responsible for managing the interaction between [FlowSource]s and [FlowConsumer]s. - * - * The engine centralizes the scheduling logic of state updates of flow connections, allowing update propagation - * to happen more efficiently. and overall, reducing the work necessary to transition into a steady state. - */ -public interface FlowEngine { - /** - * The virtual [Clock] associated with this engine. - */ - public val clock: Clock - - /** - * Create a new [FlowConsumerContext] with the given [provider]. - * - * @param consumer The consumer logic. - * @param provider The logic of the resource provider. - */ - public fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext - - /** - * Start batching the execution of resource updates until [popBatch] is called. - * - * This method is useful if you want to propagate multiple resources updates (e.g., starting multiple CPUs - * simultaneously) in a single state update. - * - * Multiple calls to this method requires the same number of [popBatch] calls in order to properly flush the - * resource updates. This allows nested calls to [pushBatch], but might cause issues if [popBatch] is not called - * the same amount of times. To simplify batching, see [batch]. - */ - public fun pushBatch() - - /** - * Stop the batching of resource updates and run the interpreter on the batch. - * - * Note that method will only flush the event once the first call to [pushBatch] has received a [popBatch] call. - */ - public fun popBatch() - - public companion object { - /** - * Construct a new [FlowEngine] implementation. - * - * @param context The coroutine context to use. - * @param clock The virtual simulation clock. - */ - @JvmStatic - @JvmName("create") - public operator fun invoke(context: CoroutineContext, clock: Clock): FlowEngine { - return FlowEngineImpl(context, clock) - } - } -} - -/** - * Batch the execution of several interrupts into a single call. - * - * This method is useful if you want to propagate the start of multiple resources (e.g., CPUs) in a single update. - */ -public inline fun FlowEngine.batch(block: () -> Unit) { - try { - pushBatch() - block() - } finally { - popBatch() - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt deleted file mode 100644 index 5202c252..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -import mu.KotlinLogging -import org.opendc.simulator.flow.internal.D_MS_TO_S -import org.opendc.simulator.flow.internal.MutableFlowCounters - -/** - * The logging instance of this connection. - */ -private val logger = KotlinLogging.logger {} - -/** - * A class that acts as a [FlowSource] and [FlowConsumer] at the same time. - * - * @param engine The [FlowEngine] the forwarder runs in. - * @param listener The convergence lister to use. - * @param isCoupled A flag to indicate that the transformer will exit when the resource consumer exits. - */ -public class FlowForwarder( - private val engine: FlowEngine, - private val listener: FlowConvergenceListener? = null, - private val isCoupled: Boolean = false -) : FlowSource, FlowConsumer, AutoCloseable { - /** - * The delegate [FlowSource]. - */ - private var delegate: FlowSource? = null - - /** - * A flag to indicate that the delegate was started. - */ - private var hasDelegateStarted: Boolean = false - - /** - * The exposed [FlowConnection]. - */ - private val _ctx = object : FlowConnection { - override var shouldSourceConverge: Boolean = false - set(value) { - field = value - _innerCtx?.shouldSourceConverge = value - } - - override val capacity: Double - get() = _innerCtx?.capacity ?: 0.0 - - override val demand: Double - get() = _innerCtx?.demand ?: 0.0 - - override val rate: Double - get() = _innerCtx?.rate ?: 0.0 - - override fun pull() { - _innerCtx?.pull() - } - - override fun pull(now: Long) { - _innerCtx?.pull(now) - } - - override fun push(rate: Double) { - if (delegate == null) { - return - } - - _innerCtx?.push(rate) - _demand = rate - } - - override fun close() { - val delegate = delegate ?: return - val hasDelegateStarted = hasDelegateStarted - - // Warning: resumption of the continuation might change the entire state of the forwarder. Make sure we - // reset beforehand the existing state and check whether it has been updated afterwards - reset() - - if (hasDelegateStarted) { - val now = engine.clock.millis() - delegate.onStop(this, now) - } - } - } - - /** - * The [FlowConnection] in which the forwarder runs. - */ - private var _innerCtx: FlowConnection? = null - - override val isActive: Boolean - get() = delegate != null - - override val capacity: Double - get() = _ctx.capacity - - override val rate: Double - get() = _ctx.rate - - override val demand: Double - get() = _ctx.demand - - override val counters: FlowCounters - get() = _counters - private val _counters = MutableFlowCounters() - - override fun startConsumer(source: FlowSource) { - check(delegate == null) { "Forwarder already active" } - - delegate = source - - // Pull to replace the source - pull() - } - - override fun pull() { - _ctx.pull() - } - - override fun cancel() { - _ctx.close() - } - - override fun close() { - val ctx = _innerCtx - - if (ctx != null) { - this._innerCtx = null - ctx.pull() - } - } - - override fun onStart(conn: FlowConnection, now: Long) { - _innerCtx = conn - - if (listener != null || _ctx.shouldSourceConverge) { - conn.shouldSourceConverge = true - } - } - - override fun onStop(conn: FlowConnection, now: Long) { - _innerCtx = null - - val delegate = delegate - if (delegate != null) { - reset() - - try { - delegate.onStop(this._ctx, now) - } catch (cause: Throwable) { - logger.error(cause) { "Uncaught exception" } - } - } - } - - override fun onPull(conn: FlowConnection, now: Long): Long { - val delegate = delegate - - if (!hasDelegateStarted) { - start() - } - - updateCounters(conn, now) - - return try { - delegate?.onPull(_ctx, now) ?: Long.MAX_VALUE - } catch (cause: Throwable) { - logger.error(cause) { "Uncaught exception" } - - reset() - Long.MAX_VALUE - } - } - - override fun onConverge(conn: FlowConnection, now: Long) { - try { - delegate?.onConverge(this._ctx, now) - listener?.onConverge(now) - } catch (cause: Throwable) { - logger.error(cause) { "Uncaught exception" } - - _innerCtx = null - reset() - } - } - - /** - * Start the delegate. - */ - private fun start() { - val delegate = delegate ?: return - - try { - val now = engine.clock.millis() - delegate.onStart(_ctx, now) - hasDelegateStarted = true - _lastUpdate = now - } catch (cause: Throwable) { - logger.error(cause) { "Uncaught exception" } - reset() - } - } - - /** - * Reset the delegate. - */ - private fun reset() { - if (isCoupled) { - _innerCtx?.close() - } else { - _innerCtx?.push(0.0) - } - - delegate = null - hasDelegateStarted = false - } - - /** - * The requested flow rate. - */ - private var _demand: Double = 0.0 - private var _lastUpdate = 0L - - /** - * Update the flow counters for the transformer. - */ - private fun updateCounters(ctx: FlowConnection, now: Long) { - val lastUpdate = _lastUpdate - _lastUpdate = now - val delta = now - lastUpdate - if (delta <= 0) { - return - } - - val counters = _counters - val deltaS = delta * D_MS_TO_S - val total = ctx.capacity * deltaS - val work = _demand * deltaS - val actualWork = ctx.rate * deltaS - - counters.increment(work, actualWork, (total - actualWork)) - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt deleted file mode 100644 index af702701..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowMapper.kt +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -/** - * A [FlowConsumer] that maps the pushed flow through [transform]. - * - * @param source The source of the flow. - * @param transform The method to transform the flow. - */ -public class FlowMapper( - private val source: FlowSource, - private val transform: (FlowConnection, Double) -> Double -) : FlowSource { - - /** - * The current active connection. - */ - private var _conn: Connection? = null - - override fun onStart(conn: FlowConnection, now: Long) { - check(_conn == null) { "Concurrent access" } - val delegate = Connection(conn, transform) - _conn = delegate - source.onStart(delegate, now) - } - - override fun onStop(conn: FlowConnection, now: Long) { - val delegate = checkNotNull(_conn) { "Invariant violation" } - _conn = null - source.onStop(delegate, now) - } - - override fun onPull(conn: FlowConnection, now: Long): Long { - val delegate = checkNotNull(_conn) { "Invariant violation" } - return source.onPull(delegate, now) - } - - override fun onConverge(conn: FlowConnection, now: Long) { - val delegate = _conn ?: return - source.onConverge(delegate, now) - } - - /** - * The wrapper [FlowConnection] that is used to transform the flow. - */ - private class Connection( - private val delegate: FlowConnection, - private val transform: (FlowConnection, Double) -> Double - ) : FlowConnection by delegate { - override fun push(rate: Double) { - delegate.push(transform(this, rate)) - } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt deleted file mode 100644 index ee8cd739..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -import org.opendc.simulator.flow.internal.D_MS_TO_S -import org.opendc.simulator.flow.internal.MutableFlowCounters - -/** - * A [FlowSink] represents a sink with a fixed capacity. - * - * @param initialCapacity The initial capacity of the resource. - * @param engine The engine that is used for driving the flow simulation. - * @param parent The parent flow system. - */ -public class FlowSink( - private val engine: FlowEngine, - initialCapacity: Double, - private val parent: FlowConvergenceListener? = null -) : FlowConsumer { - /** - * A flag to indicate that the flow consumer is active. - */ - public override val isActive: Boolean - get() = _ctx != null - - /** - * The capacity of the consumer. - */ - public override var capacity: Double = initialCapacity - set(value) { - field = value - _ctx?.capacity = value - } - - /** - * The current processing rate of the consumer. - */ - public override val rate: Double - get() = _ctx?.rate ?: 0.0 - - /** - * The flow processing rate demand at this instant. - */ - public override val demand: Double - get() = _ctx?.demand ?: 0.0 - - /** - * The flow counters to track the flow metrics of the consumer. - */ - public override val counters: FlowCounters - get() = _counters - private val _counters = MutableFlowCounters() - - /** - * The current active [FlowConsumerLogic] of this sink. - */ - private var _ctx: FlowConsumerContext? = null - - override fun startConsumer(source: FlowSource) { - check(_ctx == null) { "Consumer is in invalid state" } - - val ctx = engine.newContext(source, Logic(parent, _counters)) - _ctx = ctx - - ctx.capacity = capacity - if (parent != null) { - ctx.shouldConsumerConverge = true - } - - ctx.start() - } - - override fun pull() { - _ctx?.pull() - } - - override fun cancel() { - _ctx?.close() - } - - override fun toString(): String = "FlowSink[capacity=$capacity]" - - /** - * [FlowConsumerLogic] of a sink. - */ - private inner class Logic(private val parent: FlowConvergenceListener?, private val counters: MutableFlowCounters) : FlowConsumerLogic { - - override fun onPush( - ctx: FlowConsumerContext, - now: Long, - rate: Double - ) { - updateCounters(ctx, now, rate, ctx.capacity) - } - - override fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) { - updateCounters(ctx, now, 0.0, 0.0) - - _ctx = null - } - - override fun onConverge(ctx: FlowConsumerContext, now: Long) { - parent?.onConverge(now) - } - - /** - * The previous demand and capacity for the consumer. - */ - private val _previous = DoubleArray(2) - private var _previousUpdate = Long.MAX_VALUE - - /** - * Update the counters of the flow consumer. - */ - private fun updateCounters(ctx: FlowConnection, now: Long, nextDemand: Double, nextCapacity: Double) { - val previousUpdate = _previousUpdate - _previousUpdate = now - val delta = now - previousUpdate - - val counters = counters - val previous = _previous - val demand = previous[0] - val capacity = previous[1] - - previous[0] = nextDemand - previous[1] = nextCapacity - - if (delta <= 0) { - return - } - - val deltaS = delta * D_MS_TO_S - val total = demand * deltaS - val work = capacity * deltaS - val actualWork = ctx.rate * deltaS - - counters.increment(work, actualWork, (total - actualWork)) - } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt deleted file mode 100644 index a48ac18e..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSource.kt +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -/** - * A source of flow that is consumed by a [FlowConsumer]. - * - * Implementations of this interface should be considered stateful and must be assumed not to be re-usable - * (concurrently) for multiple [FlowConsumer]s, unless explicitly said otherwise. - */ -public interface FlowSource { - /** - * This method is invoked when the source is started. - * - * @param conn The connection between the source and consumer. - * @param now The virtual timestamp in milliseconds at which the provider finished. - */ - public fun onStart(conn: FlowConnection, now: Long) {} - - /** - * This method is invoked when the source is finished. - * - * @param conn The connection between the source and consumer. - * @param now The virtual timestamp in milliseconds at which the source finished. - */ - public fun onStop(conn: FlowConnection, now: Long) {} - - /** - * This method is invoked when the source is pulled. - * - * @param conn The connection between the source and consumer. - * @param now The virtual timestamp in milliseconds at which the pull is occurring. - * @return The duration after which the resource consumer should be pulled again. - */ - public fun onPull(conn: FlowConnection, now: Long): Long - - /** - * This method is invoked when the flow graph has converged into a steady-state system. - * - * Make sure to enable [FlowConnection.shouldSourceConverge] if you need this callback. By default, this method - * will not be invoked. - * - * @param conn The connection between the source and consumer. - * @param now The virtual timestamp in milliseconds at which the system converged. - */ - public fun onConverge(conn: FlowConnection, now: Long) {} -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt deleted file mode 100644 index 450195ec..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.internal - -/** - * Constant for converting milliseconds into seconds. - */ -internal const val D_MS_TO_S = 1 / 1000.0 diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt deleted file mode 100644 index 97d56fff..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.internal - -/** - * States of the flow connection. - */ -internal const val ConnPending = 0 // Connection is pending and the consumer is waiting to consume the source -internal const val ConnActive = 1 // Connection is active and the source is currently being consumed -internal const val ConnClosed = 2 // Connection is closed and source cannot be consumed through this connection anymore -internal const val ConnState = 0b11 // Mask for accessing the state of the flow connection - -/** - * Flags of the flow connection - */ -internal const val ConnPulled = 1 shl 2 // The source should be pulled -internal const val ConnPushed = 1 shl 3 // The source has pushed a value -internal const val ConnClose = 1 shl 4 // The connection should be closed -internal const val ConnUpdateActive = 1 shl 5 // An update for the connection is active -internal const val ConnUpdatePending = 1 shl 6 // An (immediate) update of the connection is pending -internal const val ConnConvergePending = 1 shl 7 // Indication that a convergence is already pending -internal const val ConnConvergeSource = 1 shl 8 // Enable convergence of the source -internal const val ConnConvergeConsumer = 1 shl 9 // Enable convergence of the consumer -internal const val ConnDisableTimers = 1 shl 10 // Disable timers for the source diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt deleted file mode 100644 index fba3af5f..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ /dev/null @@ -1,436 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.internal - -import mu.KotlinLogging -import org.opendc.simulator.flow.FlowConsumerContext -import org.opendc.simulator.flow.FlowConsumerLogic -import org.opendc.simulator.flow.FlowSource -import org.opendc.simulator.flow.batch -import java.util.ArrayDeque -import kotlin.math.min - -/** - * The logging instance of this connection. - */ -private val logger = KotlinLogging.logger {} - -/** - * Implementation of a [FlowConnection] managing the communication between flow sources and consumers. - */ -internal class FlowConsumerContextImpl( - private val engine: FlowEngineImpl, - private val source: FlowSource, - private val logic: FlowConsumerLogic -) : FlowConsumerContext { - /** - * The capacity of the connection. - */ - override var capacity: Double - get() = _capacity - set(value) { - val oldValue = _capacity - - // Only changes will be propagated - if (value != oldValue) { - _capacity = value - pull() - } - } - private var _capacity: Double = 0.0 - - /** - * The current processing rate of the connection. - */ - override val rate: Double - get() = _rate - private var _rate = 0.0 - - /** - * The current flow processing demand. - */ - override val demand: Double - get() = _demand - private var _demand: Double = 0.0 // The current (pending) demand of the source - - /** - * The deadline of the source. - */ - override val deadline: Long - get() = _deadline - private var _deadline: Long = Long.MAX_VALUE // The deadline of the source's timer - - /** - * Flags to control the convergence of the consumer and source. - */ - override var shouldSourceConverge: Boolean - get() = _flags and ConnConvergeSource == ConnConvergeSource - set(value) { - _flags = - if (value) { - _flags or ConnConvergeSource - } else { - _flags and ConnConvergeSource.inv() - } - } - override var shouldConsumerConverge: Boolean - get() = _flags and ConnConvergeConsumer == ConnConvergeConsumer - set(value) { - _flags = - if (value) { - _flags or ConnConvergeConsumer - } else { - _flags and ConnConvergeConsumer.inv() - } - } - - /** - * Flag to control the timers on the [FlowSource] - */ - override var enableTimers: Boolean - get() = _flags and ConnDisableTimers != ConnDisableTimers - set(value) { - _flags = - if (!value) { - _flags or ConnDisableTimers - } else { - _flags and ConnDisableTimers.inv() - } - } - - /** - * The clock to track simulation time. - */ - private val _clock = engine.clock - - /** - * The flags of the flow connection, indicating certain actions. - */ - private var _flags: Int = 0 - - /** - * The timers at which the context is scheduled to be interrupted. - */ - private var _timer: Long = Long.MAX_VALUE - private val _pendingTimers: ArrayDeque<Long> = ArrayDeque(5) - - override fun start() { - check(_flags and ConnState == ConnPending) { "Consumer is already started" } - engine.batch { - val now = _clock.millis() - source.onStart(this, now) - - // Mark the connection as active and pulled - val newFlags = (_flags and ConnState.inv()) or ConnActive or ConnPulled - scheduleImmediate(now, newFlags) - } - } - - override fun close() { - val flags = _flags - if (flags and ConnState == ConnClosed) { - return - } - - // Toggle the close bit. In case no update is active, schedule a new update. - if (flags and ConnUpdateActive == 0) { - val now = _clock.millis() - scheduleImmediate(now, flags or ConnClose) - } else { - _flags = flags or ConnClose - } - } - - override fun pull(now: Long) { - val flags = _flags - if (flags and ConnState != ConnActive) { - return - } - - // Mark connection as pulled - scheduleImmediate(now, flags or ConnPulled) - } - - override fun pull() { - pull(_clock.millis()) - } - - override fun pullSync(now: Long) { - val flags = _flags - - // Do not attempt to flush the connection if the connection is closed or an update is already active - if (flags and ConnState != ConnActive || flags and ConnUpdateActive != 0) { - return - } - - if (flags and (ConnPulled or ConnPushed) != 0 || _deadline == now) { - engine.scheduleSync(now, this) - } - } - - override fun push(rate: Double) { - if (_demand == rate) { - return - } - - _demand = rate - - val flags = _flags - - if (flags and ConnUpdateActive != 0) { - // If an update is active, it will already get picked up at the end of the update - _flags = flags or ConnPushed - } else { - // Invalidate only if no update is active - scheduleImmediate(_clock.millis(), flags or ConnPushed) - } - } - - /** - * Update the state of the flow connection. - * - * @param now The current virtual timestamp. - * @param visited The queue of connections that have been visited during the cycle. - * @param timerQueue The queue of all pending timers. - * @param isImmediate A flag to indicate that this invocation is an immediate update or a delayed update. - */ - fun doUpdate( - now: Long, - visited: FlowDeque, - timerQueue: FlowTimerQueue, - isImmediate: Boolean - ) { - var flags = _flags - - // Precondition: The flow connection must be active - if (flags and ConnState != ConnActive) { - return - } - - val deadline = _deadline - val reachedDeadline = deadline == now - var newDeadline: Long - var hasUpdated = false - - try { - // Pull the source if (1) `pull` is called or (2) the timer of the source has expired - newDeadline = if (flags and ConnPulled != 0 || reachedDeadline) { - // Update state before calling into the outside world, so it observes a consistent state - _flags = (flags and ConnPulled.inv()) or ConnUpdateActive - hasUpdated = true - - val duration = source.onPull(this, now) - - // IMPORTANT: Re-fetch the flags after the callback might have changed those - flags = _flags - - if (duration != Long.MAX_VALUE) { - now + duration - } else { - duration - } - } else { - deadline - } - - // Make the new deadline available for the consumer if it has changed - if (newDeadline != deadline) { - _deadline = newDeadline - } - - // Push to the consumer if the rate of the source has changed (after a call to `push`) - if (flags and ConnPushed != 0) { - // Update state before calling into the outside world, so it observes a consistent state - _flags = (flags and ConnPushed.inv()) or ConnUpdateActive - hasUpdated = true - - logic.onPush(this, now, _demand) - - // IMPORTANT: Re-fetch the flags after the callback might have changed those - flags = _flags - } - - // Check whether the source or consumer have tried to close the connection - if (flags and ConnClose != 0) { - hasUpdated = true - - // The source has called [FlowConnection.close], so clean up the connection - doStopSource(now) - - // IMPORTANT: Re-fetch the flags after the callback might have changed those - // We now also mark the connection as closed - flags = (_flags and ConnState.inv()) or ConnClosed - - _demand = 0.0 - newDeadline = Long.MAX_VALUE - } - } catch (cause: Throwable) { - hasUpdated = true - - // Clean up the connection - doFailSource(now, cause) - - // Mark the connection as closed - flags = (flags and ConnState.inv()) or ConnClosed - - _demand = 0.0 - newDeadline = Long.MAX_VALUE - } - - // Check whether the connection needs to be added to the visited queue. This is the case when: - // (1) An update was performed (either a push or a pull) - // (2) Either the source or consumer want to converge, and - // (3) Convergence is not already pending (ConnConvergePending) - if (hasUpdated && flags and (ConnConvergeSource or ConnConvergeConsumer) != 0 && flags and ConnConvergePending == 0) { - visited.add(this) - flags = flags or ConnConvergePending - } - - // Compute the new flow rate of the connection - // Note: _demand might be changed by [logic.onConsume], so we must re-fetch the value - _rate = min(_capacity, _demand) - - // Indicate that no update is active anymore and flush the flags - _flags = flags and ConnUpdateActive.inv() and ConnUpdatePending.inv() - - val pendingTimers = _pendingTimers - - // Prune the head timer if this is a delayed update - val timer = if (!isImmediate) { - // Invariant: Any pending timer should only point to a future timestamp - val timer = pendingTimers.poll() ?: Long.MAX_VALUE - _timer = timer - timer - } else { - _timer - } - - // Check whether we need to schedule a new timer for this connection. That is the case when: - // (1) The deadline is valid (not the maximum value) - // (2) The connection is active - // (3) Timers are not disabled for the source - // (4) The current active timer for the connection points to a later deadline - if (newDeadline == Long.MAX_VALUE || - flags and ConnState != ConnActive || - flags and ConnDisableTimers != 0 || - (timer != Long.MAX_VALUE && newDeadline >= timer) - ) { - // Ignore any deadline scheduled at the maximum value - // This indicates that the source does not want to register a timer - return - } - - // Construct a timer for the new deadline and add it to the global queue of timers - _timer = newDeadline - timerQueue.add(this, newDeadline) - - // Slow-path: a timer already exists for this connection, so add it to the queue of pending timers - if (timer != Long.MAX_VALUE) { - pendingTimers.addFirst(timer) - } - } - - /** - * This method is invoked when the system converges into a steady state. - */ - fun onConverge(now: Long) { - try { - val flags = _flags - - // The connection is converging now, so unset the convergence pending flag - _flags = flags and ConnConvergePending.inv() - - // Call the source converge callback if it has enabled convergence - if (flags and ConnConvergeSource != 0) { - source.onConverge(this, now) - } - - // Call the consumer callback if it has enabled convergence - if (flags and ConnConvergeConsumer != 0) { - logic.onConverge(this, now) - } - } catch (cause: Throwable) { - // Invoke the finish callbacks - doFailSource(now, cause) - - // Mark the connection as closed - _flags = (_flags and ConnState.inv()) or ConnClosed - _demand = 0.0 - _deadline = Long.MAX_VALUE - } - } - - override fun toString(): String = "FlowConsumerContextImpl[capacity=$capacity,rate=$_rate]" - - /** - * Stop the [FlowSource]. - */ - private fun doStopSource(now: Long) { - try { - source.onStop(this, now) - doFinishConsumer(now, null) - } catch (cause: Throwable) { - doFinishConsumer(now, cause) - } - } - - /** - * Fail the [FlowSource]. - */ - private fun doFailSource(now: Long, cause: Throwable) { - try { - source.onStop(this, now) - } catch (e: Throwable) { - e.addSuppressed(cause) - doFinishConsumer(now, e) - } - } - - /** - * Finish the consumer. - */ - private fun doFinishConsumer(now: Long, cause: Throwable?) { - try { - logic.onFinish(this, now, cause) - } catch (e: Throwable) { - e.addSuppressed(cause) - logger.error(e) { "Uncaught exception" } - } - } - - /** - * Schedule an immediate update for this connection. - */ - private fun scheduleImmediate(now: Long, flags: Int) { - // In case an immediate update is already scheduled, no need to do anything - if (flags and ConnUpdatePending != 0) { - _flags = flags - return - } - - // Mark the connection that there is an update pending - _flags = flags or ConnUpdatePending - - engine.scheduleImmediate(now, this) - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt deleted file mode 100644 index 403a9aec..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.internal - -import java.util.ArrayDeque - -/** - * A specialized [ArrayDeque] that tracks the [FlowConsumerContextImpl] instances that have updated in an interpreter - * cycle. - * - * By using a specialized class, we reduce the overhead caused by type-erasure. - */ -internal class FlowDeque(initialCapacity: Int = 256) { - /** - * The array of elements in the queue. - */ - private var _elements: Array<FlowConsumerContextImpl?> = arrayOfNulls(initialCapacity) - private var _head = 0 - private var _tail = 0 - - /** - * Determine whether this queue is not empty. - */ - fun isNotEmpty(): Boolean { - return _head != _tail - } - - /** - * Add the specified [ctx] to the queue. - */ - fun add(ctx: FlowConsumerContextImpl) { - val es = _elements - var tail = _tail - - es[tail] = ctx - - tail = inc(tail, es.size) - _tail = tail - - if (_head == tail) { - doubleCapacity() - } - } - - /** - * Remove a [FlowConsumerContextImpl] from the queue or `null` if the queue is empty. - */ - fun poll(): FlowConsumerContextImpl? { - val es = _elements - val head = _head - val ctx = es[head] - - if (ctx != null) { - es[head] = null - _head = inc(head, es.size) - } - - return ctx - } - - /** - * Clear the queue. - */ - fun clear() { - _elements.fill(null) - _head = 0 - _tail = 0 - } - - private fun inc(i: Int, modulus: Int): Int { - var x = i - if (++x >= modulus) { - x = 0 - } - return x - } - - /** - * Doubles the capacity of this deque - */ - private fun doubleCapacity() { - assert(_head == _tail) - val p = _head - val n = _elements.size - val r = n - p // number of elements to the right of p - - val newCapacity = n shl 1 - check(newCapacity >= 0) { "Sorry, deque too big" } - - val a = arrayOfNulls<FlowConsumerContextImpl>(newCapacity) - - _elements.copyInto(a, 0, p, n) - _elements.copyInto(a, r, 0, p) - - _elements = a - _head = 0 - _tail = n - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt deleted file mode 100644 index 6fd1ef31..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.internal - -import kotlinx.coroutines.Delay -import kotlinx.coroutines.DisposableHandle -import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.Runnable -import org.opendc.simulator.flow.FlowConsumerContext -import org.opendc.simulator.flow.FlowConsumerLogic -import org.opendc.simulator.flow.FlowEngine -import org.opendc.simulator.flow.FlowSource -import java.time.Clock -import java.util.ArrayDeque -import kotlin.coroutines.ContinuationInterceptor -import kotlin.coroutines.CoroutineContext - -/** - * Internal implementation of the [FlowEngine] interface. - * - * @param context The coroutine context to use. - * @param clock The virtual simulation clock. - */ -internal class FlowEngineImpl(private val context: CoroutineContext, clock: Clock) : FlowEngine, Runnable { - /** - * The [Delay] instance that provides scheduled execution of [Runnable]s. - */ - @OptIn(InternalCoroutinesApi::class) - private val delay = requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" } - - /** - * The queue of connection updates that are scheduled for immediate execution. - */ - private val queue = FlowDeque() - - /** - * A priority queue containing the connection updates to be scheduled in the future. - */ - private val futureQueue = FlowTimerQueue() - - /** - * The stack of engine invocations to occur in the future. - */ - private val futureInvocations = ArrayDeque<Invocation>() - - /** - * The systems that have been visited during the engine cycle. - */ - private val visited = FlowDeque() - - /** - * The index in the batch stack. - */ - private var batchIndex = 0 - - /** - * The virtual [Clock] of this engine. - */ - override val clock: Clock - get() = _clock - private val _clock: Clock = clock - - /** - * Update the specified [ctx] synchronously. - */ - fun scheduleSync(now: Long, ctx: FlowConsumerContextImpl) { - ctx.doUpdate(now, visited, futureQueue, isImmediate = true) - - // In-case the engine is already running in the call-stack, return immediately. The changes will be picked - // up by the active engine. - if (batchIndex > 0) { - return - } - - doRunEngine(now) - } - - /** - * Enqueue the specified [ctx] to be updated immediately during the active engine cycle. - * - * This method should be used when the state of a flow context is invalidated/interrupted and needs to be - * re-computed. In case no engine is currently active, the engine will be started. - */ - fun scheduleImmediate(now: Long, ctx: FlowConsumerContextImpl) { - queue.add(ctx) - - // In-case the engine is already running in the call-stack, return immediately. The changes will be picked - // up by the active engine. - if (batchIndex > 0) { - return - } - - doRunEngine(now) - } - - override fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext = FlowConsumerContextImpl(this, consumer, provider) - - override fun pushBatch() { - batchIndex++ - } - - override fun popBatch() { - try { - // Flush the work if the engine is not already running - if (batchIndex == 1 && queue.isNotEmpty()) { - doRunEngine(_clock.millis()) - } - } finally { - batchIndex-- - } - } - - /* Runnable */ - override fun run() { - val invocation = futureInvocations.poll() // Clear invocation from future invocation queue - doRunEngine(invocation.timestamp) - } - - /** - * Run all the enqueued actions for the specified [timestamp][now]. - */ - private fun doRunEngine(now: Long) { - val queue = queue - val futureQueue = futureQueue - val futureInvocations = futureInvocations - val visited = visited - - try { - // Increment batch index so synchronous calls will not launch concurrent engine invocations - batchIndex++ - - // Execute all scheduled updates at current timestamp - while (true) { - val ctx = futureQueue.poll(now) ?: break - ctx.doUpdate(now, visited, futureQueue, isImmediate = false) - } - - // Repeat execution of all immediate updates until the system has converged to a steady-state - // We have to take into account that the onConverge callback can also trigger new actions. - do { - // Execute all immediate updates - while (true) { - val ctx = queue.poll() ?: break - ctx.doUpdate(now, visited, futureQueue, isImmediate = true) - } - - while (true) { - val ctx = visited.poll() ?: break - ctx.onConverge(now) - } - } while (queue.isNotEmpty()) - } finally { - // Decrement batch index to indicate no engine is active at the moment - batchIndex-- - } - - // Schedule an engine invocation for the next update to occur. - val headDeadline = futureQueue.peekDeadline() - if (headDeadline != Long.MAX_VALUE) { - trySchedule(now, futureInvocations, headDeadline) - } - } - - /** - * Try to schedule an engine invocation at the specified [target]. - * - * @param now The current virtual timestamp. - * @param target The virtual timestamp at which the engine invocation should happen. - * @param scheduled The queue of scheduled invocations. - */ - private fun trySchedule(now: Long, scheduled: ArrayDeque<Invocation>, target: Long) { - val head = scheduled.peek() - - // Only schedule a new scheduler invocation in case the target is earlier than all other pending - // scheduler invocations - if (head == null || target < head.timestamp) { - @OptIn(InternalCoroutinesApi::class) - val handle = delay.invokeOnTimeout(target - now, this, context) - scheduled.addFirst(Invocation(target, handle)) - } - } - - /** - * A future engine invocation. - * - * This class is used to keep track of the future engine invocations created using the [Delay] instance. In case - * the invocation is not needed anymore, it can be cancelled via [cancel]. - */ - private class Invocation( - @JvmField val timestamp: Long, - @JvmField val handle: DisposableHandle - ) { - /** - * Cancel the engine invocation. - */ - fun cancel() = handle.dispose() - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt deleted file mode 100644 index 47061a91..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.internal - -/** - * Specialized priority queue for flow timers. - * - * By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation - * being generic. - */ -internal class FlowTimerQueue(initialCapacity: Int = 256) { - /** - * The binary heap of deadlines. - */ - private var _deadlines = LongArray(initialCapacity) { Long.MIN_VALUE } - - /** - * The binary heap of [FlowConsumerContextImpl]s. - */ - private var _pending = arrayOfNulls<FlowConsumerContextImpl>(initialCapacity) - - /** - * The number of elements in the priority queue. - */ - private var size = 0 - - /** - * Register a timer for [ctx] with [deadline]. - */ - fun add(ctx: FlowConsumerContextImpl, deadline: Long) { - val i = size - var deadlines = _deadlines - if (i >= deadlines.size) { - grow() - // Re-fetch the resized array - deadlines = _deadlines - } - - siftUp(deadlines, _pending, i, ctx, deadline) - - size = i + 1 - } - - /** - * Update all pending [FlowConsumerContextImpl]s at the timestamp [now]. - */ - fun poll(now: Long): FlowConsumerContextImpl? { - if (size == 0) { - return null - } - - val deadlines = _deadlines - val deadline = deadlines[0] - - if (now < deadline) { - return null - } - - val pending = _pending - val res = pending[0] - val s = --size - - val nextDeadline = deadlines[s] - val next = pending[s]!! - - // Clear the last element of the queue - pending[s] = null - deadlines[s] = Long.MIN_VALUE - - if (s != 0) { - siftDown(deadlines, pending, next, nextDeadline) - } - - return res - } - - /** - * Find the earliest deadline in the queue. - */ - fun peekDeadline(): Long { - return if (size == 0) Long.MAX_VALUE else _deadlines[0] - } - - /** - * Increases the capacity of the array. - */ - private fun grow() { - val oldCapacity = _deadlines.size - // Double size if small; else grow by 50% - val newCapacity = oldCapacity + if (oldCapacity < 64) oldCapacity + 2 else oldCapacity shr 1 - - _deadlines = _deadlines.copyOf(newCapacity) - _pending = _pending.copyOf(newCapacity) - } - - /** - * Insert item [ctx] at position [pos], maintaining heap invariant by promoting [ctx] up the tree until it is - * greater than or equal to its parent, or is the root. - * - * @param deadlines The heap of deadlines. - * @param pending The heap of contexts. - * @param pos The position to fill. - * @param ctx The [FlowConsumerContextImpl] to insert. - * @param deadline The deadline of the context. - */ - private fun siftUp( - deadlines: LongArray, - pending: Array<FlowConsumerContextImpl?>, - pos: Int, - ctx: FlowConsumerContextImpl, - deadline: Long - ) { - var k = pos - - while (k > 0) { - val parent = (k - 1) ushr 1 - val parentDeadline = deadlines[parent] - - if (deadline >= parentDeadline) { - break - } - - deadlines[k] = parentDeadline - pending[k] = pending[parent] - - k = parent - } - - deadlines[k] = deadline - pending[k] = ctx - } - - /** - * Inserts [ctx] at the top, maintaining heap invariant by demoting [ctx] down the tree repeatedly until it - * is less than or equal to its children or is a leaf. - * - * @param deadlines The heap of deadlines. - * @param pending The heap of contexts. - * @param ctx The [FlowConsumerContextImpl] to insert. - * @param deadline The deadline of the context. - */ - private fun siftDown( - deadlines: LongArray, - pending: Array<FlowConsumerContextImpl?>, - ctx: FlowConsumerContextImpl, - deadline: Long - ) { - var k = 0 - val size = size - val half = size ushr 1 - - while (k < half) { - var child = (k shl 1) + 1 - - var childDeadline = deadlines[child] - val right = child + 1 - - if (right < size) { - val rightDeadline = deadlines[right] - - if (childDeadline > rightDeadline) { - child = right - childDeadline = rightDeadline - } - } - - if (deadline <= childDeadline) { - break - } - - deadlines[k] = childDeadline - pending[k] = pending[child] - - k = child - } - - deadlines[k] = deadline - pending[k] = ctx - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt deleted file mode 100644 index c320a362..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.internal - -import org.opendc.simulator.flow.FlowCounters - -/** - * Mutable implementation of the [FlowCounters] interface. - */ -public class MutableFlowCounters : FlowCounters { - override val demand: Double - get() = _counters[0] - override val actual: Double - get() = _counters[1] - override val remaining: Double - get() = _counters[2] - private val _counters = DoubleArray(3) - - override fun reset() { - _counters.fill(0.0) - } - - public fun increment(demand: Double, actual: Double, remaining: Double) { - val counters = _counters - counters[0] += demand - counters[1] += actual - counters[2] += remaining - } - - override fun toString(): String { - return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining]" - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt deleted file mode 100644 index 8752c559..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.mux - -import org.opendc.simulator.flow.FlowConsumer -import org.opendc.simulator.flow.FlowCounters -import org.opendc.simulator.flow.FlowSource - -/** - * A [FlowMultiplexer] enables multiplexing multiple [FlowSource]s over possibly multiple [FlowConsumer]s. - */ -public interface FlowMultiplexer { - /** - * The maximum number of inputs supported by the multiplexer. - */ - public val maxInputs: Int - - /** - * The maximum number of outputs supported by the multiplexer. - */ - public val maxOutputs: Int - - /** - * The inputs of the multiplexer that can be used to consume sources. - */ - public val inputs: Set<FlowConsumer> - - /** - * The outputs of the multiplexer over which the flows will be distributed. - */ - public val outputs: Set<FlowSource> - - /** - * The actual processing rate of the multiplexer. - */ - public val rate: Double - - /** - * The demanded processing rate of the input. - */ - public val demand: Double - - /** - * The capacity of the outputs. - */ - public val capacity: Double - - /** - * The flow counters to track the flow metrics of all multiplexer inputs. - */ - public val counters: FlowCounters - - /** - * Create a new input on this multiplexer with a coupled capacity. - */ - public fun newInput(): FlowConsumer - - /** - * Create a new input on this multiplexer with the specified [capacity]. - * - * @param capacity The capacity of the input. - */ - public fun newInput(capacity: Double): FlowConsumer - - /** - * Remove [input] from this multiplexer. - */ - public fun removeInput(input: FlowConsumer) - - /** - * Create a new output on this multiplexer. - */ - public fun newOutput(): FlowSource - - /** - * Remove [output] from this multiplexer. - */ - public fun removeOutput(output: FlowSource) - - /** - * Clear all inputs and outputs from the multiplexer. - */ - public fun clear() - - /** - * Clear the inputs of the multiplexer. - */ - public fun clearInputs() - - /** - * Clear the outputs of the multiplexer. - */ - public fun clearOutputs() - - /** - * Flush the counters of the multiplexer. - */ - public fun flushCounters() - - /** - * Flush the counters of the specified [input]. - */ - public fun flushCounters(input: FlowConsumer) -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt deleted file mode 100644 index a863e3ad..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexerFactory.kt +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (c) 2022 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.mux - -import org.opendc.simulator.flow.FlowConvergenceListener -import org.opendc.simulator.flow.FlowEngine - -/** - * Factory interface for a [FlowMultiplexer] implementation. - */ -public fun interface FlowMultiplexerFactory { - /** - * Construct a new [FlowMultiplexer] using the specified [engine] and [listener]. - */ - public fun newMultiplexer(engine: FlowEngine, listener: FlowConvergenceListener?): FlowMultiplexer - - public companion object { - /** - * A [FlowMultiplexerFactory] constructing a [MaxMinFlowMultiplexer]. - */ - private val MAX_MIN_FACTORY = FlowMultiplexerFactory { engine, listener -> MaxMinFlowMultiplexer(engine, listener) } - - /** - * A [FlowMultiplexerFactory] constructing a [ForwardingFlowMultiplexer]. - */ - private val FORWARDING_FACTORY = FlowMultiplexerFactory { engine, listener -> ForwardingFlowMultiplexer(engine, listener) } - - /** - * Return a [FlowMultiplexerFactory] that returns [MaxMinFlowMultiplexer] instances. - */ - @JvmStatic - public fun maxMinMultiplexer(): FlowMultiplexerFactory = MAX_MIN_FACTORY - - /** - * Return a [ForwardingFlowMultiplexer] that returns [ForwardingFlowMultiplexer] instances. - */ - @JvmStatic - public fun forwardingMultiplexer(): FlowMultiplexerFactory = FORWARDING_FACTORY - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt deleted file mode 100644 index 53f94a94..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.mux - -import org.opendc.simulator.flow.FlowConnection -import org.opendc.simulator.flow.FlowConsumer -import org.opendc.simulator.flow.FlowConvergenceListener -import org.opendc.simulator.flow.FlowCounters -import org.opendc.simulator.flow.FlowEngine -import org.opendc.simulator.flow.FlowForwarder -import org.opendc.simulator.flow.FlowSource -import java.util.ArrayDeque - -/** - * A [FlowMultiplexer] implementation that allocates inputs to the outputs of the multiplexer exclusively. This means - * that a single input is directly connected to an output and that the multiplexer can only support as many - * inputs as outputs. - * - * @param engine The [FlowEngine] driving the simulation. - * @param listener The convergence listener of the multiplexer. - */ -public class ForwardingFlowMultiplexer( - private val engine: FlowEngine, - private val listener: FlowConvergenceListener? = null -) : FlowMultiplexer, FlowConvergenceListener { - - override val maxInputs: Int - get() = _outputs.size - - override val maxOutputs: Int = Int.MAX_VALUE - - override val inputs: Set<FlowConsumer> - get() = _inputs - private val _inputs = mutableSetOf<Input>() - - override val outputs: Set<FlowSource> - get() = _outputs - private val _outputs = mutableSetOf<Output>() - private val _availableOutputs = ArrayDeque<Output>() - - override val counters: FlowCounters = object : FlowCounters { - override val demand: Double - get() = _outputs.sumOf { it.forwarder.counters.demand } - override val actual: Double - get() = _outputs.sumOf { it.forwarder.counters.actual } - override val remaining: Double - get() = _outputs.sumOf { it.forwarder.counters.remaining } - - override fun reset() { - for (output in _outputs) { - output.forwarder.counters.reset() - } - } - - override fun toString(): String = "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining]" - } - - override val rate: Double - get() = _outputs.sumOf { it.forwarder.rate } - - override val demand: Double - get() = _outputs.sumOf { it.forwarder.demand } - - override val capacity: Double - get() = _outputs.sumOf { it.forwarder.capacity } - - override fun newInput(): FlowConsumer { - val output = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" } - val input = Input(output) - _inputs += input - return input - } - - override fun newInput(capacity: Double): FlowConsumer = newInput() - - override fun removeInput(input: FlowConsumer) { - if (!_inputs.remove(input)) { - return - } - - val output = (input as Input).output - output.forwarder.cancel() - _availableOutputs += output - } - - override fun newOutput(): FlowSource { - val forwarder = FlowForwarder(engine, this) - val output = Output(forwarder) - - _outputs += output - return output - } - - override fun removeOutput(output: FlowSource) { - if (!_outputs.remove(output)) { - return - } - - val forwarder = (output as Output).forwarder - forwarder.close() - } - - override fun clearInputs() { - for (input in _inputs) { - val output = input.output - output.forwarder.cancel() - _availableOutputs += output - } - - _inputs.clear() - } - - override fun clearOutputs() { - for (output in _outputs) { - output.forwarder.cancel() - } - _outputs.clear() - _availableOutputs.clear() - } - - override fun clear() { - clearOutputs() - clearInputs() - } - - override fun flushCounters() {} - - override fun flushCounters(input: FlowConsumer) {} - - override fun onConverge(now: Long) { - listener?.onConverge(now) - } - - /** - * An input on the multiplexer. - */ - private inner class Input(@JvmField val output: Output) : FlowConsumer by output.forwarder { - override fun toString(): String = "ForwardingFlowMultiplexer.Input" - } - - /** - * An output on the multiplexer. - */ - private inner class Output(@JvmField val forwarder: FlowForwarder) : FlowSource by forwarder { - override fun onStart(conn: FlowConnection, now: Long) { - _availableOutputs += this - forwarder.onStart(conn, now) - } - - override fun onStop(conn: FlowConnection, now: Long) { - forwarder.cancel() - forwarder.onStop(conn, now) - } - - override fun toString(): String = "ForwardingFlowMultiplexer.Output" - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt deleted file mode 100644 index d9c6f893..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ /dev/null @@ -1,811 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.mux - -import org.opendc.simulator.flow.FlowConnection -import org.opendc.simulator.flow.FlowConsumer -import org.opendc.simulator.flow.FlowConsumerContext -import org.opendc.simulator.flow.FlowConsumerLogic -import org.opendc.simulator.flow.FlowConvergenceListener -import org.opendc.simulator.flow.FlowCounters -import org.opendc.simulator.flow.FlowEngine -import org.opendc.simulator.flow.FlowSource -import org.opendc.simulator.flow.internal.D_MS_TO_S -import org.opendc.simulator.flow.internal.MutableFlowCounters -import kotlin.math.min - -/** - * A [FlowMultiplexer] implementation that multiplexes flows over the available outputs using max-min fair sharing. - * - * @param engine The [FlowEngine] to drive the flow simulation. - * @param parent The parent flow system of the multiplexer. - */ -public class MaxMinFlowMultiplexer( - private val engine: FlowEngine, - parent: FlowConvergenceListener? = null -) : FlowMultiplexer { - - override val maxInputs: Int = Int.MAX_VALUE - - override val maxOutputs: Int = Int.MAX_VALUE - - /** - * The inputs of the multiplexer. - */ - override val inputs: Set<FlowConsumer> - get() = _inputs - private val _inputs = mutableSetOf<Input>() - - /** - * The outputs of the multiplexer. - */ - override val outputs: Set<FlowSource> - get() = _outputs - private val _outputs = mutableSetOf<Output>() - - /** - * The flow counters of this multiplexer. - */ - public override val counters: FlowCounters - get() = scheduler.counters - - /** - * The actual processing rate of the multiplexer. - */ - public override val rate: Double - get() = scheduler.rate - - /** - * The demanded processing rate of the input. - */ - public override val demand: Double - get() = scheduler.demand - - /** - * The capacity of the outputs. - */ - public override val capacity: Double - get() = scheduler.capacity - - /** - * The [Scheduler] instance of this multiplexer. - */ - private val scheduler = Scheduler(engine, parent) - - override fun newInput(): FlowConsumer { - return newInput(isCoupled = true, Double.POSITIVE_INFINITY) - } - - override fun newInput(capacity: Double): FlowConsumer { - return newInput(isCoupled = false, capacity) - } - - private fun newInput(isCoupled: Boolean, initialCapacity: Double): FlowConsumer { - val provider = Input(engine, scheduler, isCoupled, initialCapacity) - _inputs.add(provider) - return provider - } - - override fun removeInput(input: FlowConsumer) { - if (!_inputs.remove(input)) { - return - } - // This cast should always succeed since only `Input` instances should be added to `_inputs` - (input as Input).close() - } - - override fun newOutput(): FlowSource { - val output = Output(scheduler) - _outputs.add(output) - return output - } - - override fun removeOutput(output: FlowSource) { - if (!_outputs.remove(output)) { - return - } - - // This cast should always succeed since only `Output` instances should be added to `_outputs` - (output as Output).cancel() - } - - override fun clearInputs() { - for (input in _inputs) { - input.cancel() - } - _inputs.clear() - } - - override fun clearOutputs() { - for (output in _outputs) { - output.cancel() - } - _outputs.clear() - } - - override fun clear() { - clearOutputs() - clearInputs() - } - - override fun flushCounters() { - scheduler.updateCounters(engine.clock.millis()) - } - - override fun flushCounters(input: FlowConsumer) { - (input as Input).doUpdateCounters(engine.clock.millis()) - } - - /** - * Helper class containing the scheduler state. - */ - private class Scheduler(engine: FlowEngine, private val parent: FlowConvergenceListener?) { - /** - * The flow counters of this scheduler. - */ - @JvmField val counters = MutableFlowCounters() - - /** - * The flow rate of the multiplexer. - */ - @JvmField var rate = 0.0 - - /** - * The demand for the multiplexer. - */ - @JvmField var demand = 0.0 - - /** - * The capacity of the multiplexer. - */ - @JvmField var capacity = 0.0 - - /** - * An [Output] that is used to activate the scheduler. - */ - @JvmField var activationOutput: Output? = null - - /** - * The active inputs registered with the scheduler. - */ - private val _activeInputs = mutableListOf<Input>() - - /** - * An array containing the active inputs, which is used to reduce the overhead of an [ArrayList]. - */ - private var _inputArray = emptyArray<Input>() - - /** - * The active outputs registered with the scheduler. - */ - private val _activeOutputs = mutableListOf<Output>() - - /** - * Flag to indicate that the scheduler is active. - */ - private var _schedulerActive = false - - /** - * The last convergence timestamp and the input. - */ - private var _lastConverge: Long = Long.MIN_VALUE - private var _lastConvergeInput: Input? = null - - /** - * The simulation clock. - */ - private val _clock = engine.clock - - /** - * Register the specified [input] to this scheduler. - */ - fun registerInput(input: Input) { - _activeInputs.add(input) - _inputArray = _activeInputs.toTypedArray() - - val hasActivationOutput = activationOutput != null - - // Disable timers and convergence of the source if one of the output manages it - input.shouldConsumerConverge = !hasActivationOutput - input.enableTimers = !hasActivationOutput - - if (input.isCoupled) { - input.capacity = capacity - } - - trigger(_clock.millis()) - } - - /** - * De-register the specified [input] from this scheduler. - */ - fun deregisterInput(input: Input, now: Long) { - // Assign a new input responsible for handling the convergence events - if (_lastConvergeInput == input) { - _lastConvergeInput = null - } - - _activeInputs.remove(input) - - // Re-run scheduler to distribute new load - trigger(now) - } - - /** - * This method is invoked when one of the inputs converges. - */ - fun convergeInput(input: Input, now: Long) { - val lastConverge = _lastConverge - val lastConvergeInput = _lastConvergeInput - val parent = parent - - if (parent != null && (now > lastConverge || lastConvergeInput == null || lastConvergeInput == input)) { - _lastConverge = now - _lastConvergeInput = input - - parent.onConverge(now) - } - } - - /** - * Register the specified [output] to this scheduler. - */ - fun registerOutput(output: Output) { - _activeOutputs.add(output) - - updateCapacity() - updateActivationOutput() - } - - /** - * De-register the specified [output] from this scheduler. - */ - fun deregisterOutput(output: Output, now: Long) { - _activeOutputs.remove(output) - updateCapacity() - - trigger(now) - } - - /** - * This method is invoked when one of the outputs converges. - */ - fun convergeOutput(output: Output, now: Long) { - val parent = parent - - if (parent != null) { - _lastConverge = now - parent.onConverge(now) - } - - if (!output.isActive) { - output.isActivationOutput = false - updateActivationOutput() - } - } - - /** - * Trigger the scheduler of the multiplexer. - * - * @param now The current virtual timestamp of the simulation. - */ - fun trigger(now: Long) { - if (_schedulerActive) { - // No need to trigger the scheduler in case it is already active - return - } - - val activationOutput = activationOutput - - // We can run the scheduler in two ways: - // (1) We can pull one of the multiplexer's outputs. This allows us to cascade multiple pushes by the input - // into a single scheduling cycle, but is slower in case of a few changes at the same timestamp. - // (2) We run the scheduler directly from this method call. This is the fastest approach when there are only - // a few inputs and little changes at the same timestamp. - // We always pick for option (1) unless there are no outputs available. - if (activationOutput != null) { - activationOutput.pull(now) - return - } else { - runScheduler(now) - } - } - - /** - * Synchronously run the scheduler of the multiplexer. - */ - fun runScheduler(now: Long): Long { - return try { - _schedulerActive = true - doRunScheduler(now) - } finally { - _schedulerActive = false - } - } - - /** - * Recompute the capacity of the multiplexer. - */ - fun updateCapacity() { - val newCapacity = _activeOutputs.sumOf(Output::capacity) - - // No-op if the capacity is unchanged - if (capacity == newCapacity) { - return - } - - capacity = newCapacity - - for (input in _activeInputs) { - if (input.isCoupled) { - input.capacity = newCapacity - } - } - - // Sort outputs by their capacity - _activeOutputs.sort() - } - - /** - * Updates the output that is used for scheduler activation. - */ - private fun updateActivationOutput() { - val output = _activeOutputs.firstOrNull() - activationOutput = output - - if (output != null) { - output.isActivationOutput = true - } - - val hasActivationOutput = output != null - - for (input in _activeInputs) { - input.shouldConsumerConverge = !hasActivationOutput - input.enableTimers = !hasActivationOutput - } - } - - /** - * Schedule the inputs over the outputs. - * - * @return The deadline after which a new scheduling cycle should start. - */ - private fun doRunScheduler(now: Long): Long { - val activeInputs = _activeInputs - val activeOutputs = _activeOutputs - var inputArray = _inputArray - var inputSize = _inputArray.size - - // Update the counters of the scheduler - updateCounters(now) - - // If there is no work yet, mark the inputs as idle. - if (inputSize == 0) { - demand = 0.0 - rate = 0.0 - return Long.MAX_VALUE - } - - val capacity = capacity - var availableCapacity = capacity - var deadline = Long.MAX_VALUE - var demand = 0.0 - var shouldRebuild = false - - // Pull in the work of the inputs - for (i in 0 until inputSize) { - val input = inputArray[i] - - input.pullSync(now) - - // Remove inputs that have finished - if (!input.isActive) { - input.actualRate = 0.0 - shouldRebuild = true - } else { - demand += input.limit - deadline = min(deadline, input.deadline) - } - } - - // Slow-path: Rebuild the input array based on the (apparently) updated `activeInputs` - if (shouldRebuild) { - inputArray = activeInputs.toTypedArray() - inputSize = inputArray.size - _inputArray = inputArray - } - - val rate = if (demand > capacity) { - // If the demand is higher than the capacity, we need use max-min fair sharing to distribute the - // constrained capacity across the inputs. - - // Sort in-place the inputs based on their pushed flow. - // Profiling shows that it is faster than maintaining some kind of sorted set. - inputArray.sort() - - // Divide the available output capacity fairly over the inputs using max-min fair sharing - for (i in 0 until inputSize) { - val input = inputArray[i] - val availableShare = availableCapacity / (inputSize - i) - val grantedRate = min(input.allowedRate, availableShare) - - availableCapacity -= grantedRate - input.actualRate = grantedRate - } - - capacity - availableCapacity - } else { - demand - } - - this.demand = demand - if (this.rate != rate) { - // Only update the outputs if the output rate has changed - this.rate = rate - - // Divide the requests over the available capacity of the input resources fairly - for (i in activeOutputs.indices) { - val output = activeOutputs[i] - val inputCapacity = output.capacity - val fraction = inputCapacity / capacity - val grantedSpeed = rate * fraction - - output.push(grantedSpeed) - } - } - - return deadline - } - - /** - * The previous capacity of the multiplexer. - */ - private var _previousCapacity = 0.0 - private var _previousUpdate = Long.MIN_VALUE - - /** - * Update the counters of the scheduler. - */ - fun updateCounters(now: Long) { - val previousCapacity = _previousCapacity - _previousCapacity = capacity - - val previousUpdate = _previousUpdate - _previousUpdate = now - - val delta = now - previousUpdate - if (delta <= 0) { - return - } - - val deltaS = delta * D_MS_TO_S - val demand = demand - val rate = rate - - counters.increment( - demand = demand * deltaS, - actual = rate * deltaS, - remaining = (previousCapacity - rate) * deltaS - ) - } - } - - /** - * An internal [FlowConsumer] implementation for multiplexer inputs. - */ - private class Input( - private val engine: FlowEngine, - private val scheduler: Scheduler, - @JvmField val isCoupled: Boolean, - initialCapacity: Double - ) : FlowConsumer, FlowConsumerLogic, Comparable<Input> { - /** - * A flag to indicate that the consumer is active. - */ - override val isActive: Boolean - get() = _ctx != null - - /** - * The demand of the consumer. - */ - override val demand: Double - get() = limit - - /** - * The processing rate of the consumer. - */ - override val rate: Double - get() = actualRate - - /** - * The capacity of the input. - */ - override var capacity: Double - get() = _capacity - set(value) { - allowedRate = min(limit, value) - _capacity = value - _ctx?.capacity = value - } - private var _capacity = initialCapacity - - /** - * The flow counters to track the flow metrics of the consumer. - */ - override val counters: FlowCounters - get() = _counters - private val _counters = MutableFlowCounters() - - /** - * A flag to enable timers for the input. - */ - var enableTimers: Boolean = true - set(value) { - field = value - _ctx?.enableTimers = value - } - - /** - * A flag to control whether the input should converge. - */ - var shouldConsumerConverge: Boolean = true - set(value) { - field = value - _ctx?.shouldConsumerConverge = value - } - - /** - * The requested limit. - */ - @JvmField var limit: Double = 0.0 - - /** - * The actual processing speed. - */ - @JvmField var actualRate: Double = 0.0 - - /** - * The processing rate that is allowed by the model constraints. - */ - @JvmField var allowedRate: Double = 0.0 - - /** - * The deadline of the input. - */ - val deadline: Long - get() = _ctx?.deadline ?: Long.MAX_VALUE - - /** - * The [FlowConsumerContext] that is currently running. - */ - private var _ctx: FlowConsumerContext? = null - - /** - * A flag to indicate that the input is closed. - */ - private var _isClosed: Boolean = false - - /** - * Close the input. - * - * This method is invoked when the user removes an input from the switch. - */ - fun close() { - _isClosed = true - cancel() - } - - /** - * Pull the source if necessary. - */ - fun pullSync(now: Long) { - _ctx?.pullSync(now) - } - - /* FlowConsumer */ - override fun startConsumer(source: FlowSource) { - check(!_isClosed) { "Cannot re-use closed input" } - check(_ctx == null) { "Consumer is in invalid state" } - - val ctx = engine.newContext(source, this) - _ctx = ctx - - ctx.capacity = capacity - scheduler.registerInput(this) - - ctx.start() - } - - override fun pull() { - _ctx?.pull() - } - - override fun cancel() { - _ctx?.close() - } - - /* FlowConsumerLogic */ - override fun onPush( - ctx: FlowConsumerContext, - now: Long, - rate: Double - ) { - doUpdateCounters(now) - - val allowed = min(rate, capacity) - limit = rate - actualRate = allowed - allowedRate = allowed - - scheduler.trigger(now) - } - - override fun onFinish(ctx: FlowConsumerContext, now: Long, cause: Throwable?) { - doUpdateCounters(now) - - limit = 0.0 - actualRate = 0.0 - allowedRate = 0.0 - - scheduler.deregisterInput(this, now) - - _ctx = null - } - - override fun onConverge(ctx: FlowConsumerContext, now: Long) { - scheduler.convergeInput(this, now) - } - - /* Comparable */ - override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate) - - /** - * The timestamp that the counters where last updated. - */ - private var _lastUpdate = Long.MIN_VALUE - - /** - * Helper method to update the flow counters of the multiplexer. - */ - fun doUpdateCounters(now: Long) { - val lastUpdate = _lastUpdate - _lastUpdate = now - - val delta = (now - lastUpdate).coerceAtLeast(0) - if (delta <= 0L) { - return - } - - val actualRate = actualRate - - val deltaS = delta * D_MS_TO_S - val demand = limit * deltaS - val actual = actualRate * deltaS - val remaining = (_capacity - actualRate) * deltaS - - _counters.increment(demand, actual, remaining) - scheduler.counters.increment(0.0, 0.0, 0.0) - } - } - - /** - * An internal [FlowSource] implementation for multiplexer outputs. - */ - private class Output(private val scheduler: Scheduler) : FlowSource, Comparable<Output> { - /** - * The active [FlowConnection] of this source. - */ - private var _conn: FlowConnection? = null - - /** - * The capacity of this output. - */ - @JvmField var capacity: Double = 0.0 - - /** - * A flag to indicate that this output is the activation output. - */ - var isActivationOutput: Boolean - get() = _isActivationOutput - set(value) { - _isActivationOutput = value - _conn?.shouldSourceConverge = value - } - private var _isActivationOutput: Boolean = false - - /** - * A flag to indicate that the output is active. - */ - @JvmField var isActive = false - - /** - * Push the specified rate to the consumer. - */ - fun push(rate: Double) { - _conn?.push(rate) - } - - /** - * Cancel this output. - */ - fun cancel() { - _conn?.close() - } - - /** - * Pull this output. - */ - fun pull(now: Long) { - _conn?.pull(now) - } - - override fun onStart(conn: FlowConnection, now: Long) { - assert(_conn == null) { "Source running concurrently" } - _conn = conn - capacity = conn.capacity - isActive = true - - scheduler.registerOutput(this) - } - - override fun onStop(conn: FlowConnection, now: Long) { - _conn = null - capacity = 0.0 - isActive = false - - scheduler.deregisterOutput(this, now) - } - - override fun onPull(conn: FlowConnection, now: Long): Long { - val capacity = capacity - if (capacity != conn.capacity) { - this.capacity = capacity - scheduler.updateCapacity() - } - - return if (_isActivationOutput) { - // If this output is the activation output, synchronously run the scheduler and return the new deadline - val deadline = scheduler.runScheduler(now) - if (deadline == Long.MAX_VALUE) { - deadline - } else { - deadline - now - } - } else { - // Output is not the activation output, so trigger activation output and do not install timer for this - // output (by returning `Long.MAX_VALUE`) - scheduler.trigger(now) - - Long.MAX_VALUE - } - } - - override fun onConverge(conn: FlowConnection, now: Long) { - if (_isActivationOutput) { - scheduler.convergeOutput(this, now) - } - } - - override fun compareTo(other: Output): Int = capacity.compareTo(other.capacity) - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt deleted file mode 100644 index 6cfcc82c..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FixedFlowSource.kt +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.source - -import org.opendc.simulator.flow.FlowConnection -import org.opendc.simulator.flow.FlowSource -import kotlin.math.roundToLong - -/** - * A [FlowSource] that contains a fixed [amount] and is pushed with a given [utilization]. - */ -public class FixedFlowSource(private val amount: Double, private val utilization: Double) : FlowSource { - - init { - require(amount >= 0.0) { "Amount must be positive" } - require(utilization > 0.0) { "Utilization must be positive" } - } - - private var remainingAmount = amount - private var lastPull: Long = 0L - - override fun onStart(conn: FlowConnection, now: Long) { - lastPull = now - } - - override fun onPull(conn: FlowConnection, now: Long): Long { - val lastPull = lastPull - this.lastPull = now - val delta = (now - lastPull).coerceAtLeast(0) - - val consumed = conn.rate * delta / 1000.0 - val limit = conn.capacity * utilization - - remainingAmount -= consumed - - val duration = (remainingAmount / limit * 1000).roundToLong() - - return if (duration > 0) { - conn.push(limit) - duration - } else { - conn.close() - Long.MAX_VALUE - } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt deleted file mode 100644 index b3191ad3..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceBarrier.kt +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.source - -/** - * The [FlowSourceBarrier] is a barrier that allows multiple sources to wait for a select number of other sources to - * finish a pull, before proceeding its operation. - */ -public class FlowSourceBarrier(public val parties: Int) { - private var counter = 0 - - /** - * Enter the barrier and determine whether the caller is the last to reach the barrier. - * - * @return `true` if the caller is the last to reach the barrier, `false` otherwise. - */ - public fun enter(): Boolean { - val last = ++counter == parties - if (last) { - counter = 0 - return true - } - return false - } - - /** - * Reset the barrier. - */ - public fun reset() { - counter = 0 - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt deleted file mode 100644 index 80127fb5..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/FlowSourceRateAdapter.kt +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.source - -import org.opendc.simulator.flow.FlowConnection -import org.opendc.simulator.flow.FlowSource - -/** - * Helper class to expose an observable [rate] field describing the flow rate of the source. - */ -public class FlowSourceRateAdapter( - private val delegate: FlowSource, - private val callback: (Double) -> Unit = {} -) : FlowSource by delegate { - /** - * The resource processing speed at this instant. - */ - public var rate: Double = 0.0 - private set(value) { - if (field != value) { - callback(value) - field = value - } - } - - init { - callback(0.0) - } - - override fun onStart(conn: FlowConnection, now: Long) { - conn.shouldSourceConverge = true - - delegate.onStart(conn, now) - } - - override fun onStop(conn: FlowConnection, now: Long) { - try { - delegate.onStop(conn, now) - } finally { - rate = 0.0 - } - } - - override fun onPull(conn: FlowConnection, now: Long): Long { - return delegate.onPull(conn, now) - } - - override fun onConverge(conn: FlowConnection, now: Long) { - try { - delegate.onConverge(conn, now) - } finally { - rate = conn.rate - } - } - - override fun toString(): String = "FlowSourceRateAdapter[delegate=$delegate]" -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt deleted file mode 100644 index c9a52128..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/source/TraceFlowSource.kt +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.source - -import org.opendc.simulator.flow.FlowConnection -import org.opendc.simulator.flow.FlowSource - -/** - * A [FlowSource] that replays a sequence of [Fragment], each indicating the flow rate for some period of time. - */ -public class TraceFlowSource(private val trace: Sequence<Fragment>) : FlowSource { - private var _iterator: Iterator<Fragment>? = null - private var _nextTarget = Long.MIN_VALUE - - override fun onStart(conn: FlowConnection, now: Long) { - check(_iterator == null) { "Source already running" } - _iterator = trace.iterator() - } - - override fun onStop(conn: FlowConnection, now: Long) { - _iterator = null - } - - override fun onPull(conn: FlowConnection, now: Long): Long { - // Check whether the trace fragment was fully consumed, otherwise wait until we have done so - val nextTarget = _nextTarget - if (nextTarget > now) { - return now - nextTarget - } - - val iterator = checkNotNull(_iterator) - return if (iterator.hasNext()) { - val fragment = iterator.next() - _nextTarget = now + fragment.duration - conn.push(fragment.usage) - fragment.duration - } else { - conn.close() - Long.MAX_VALUE - } - } - - /** - * A fragment of the trace. - */ - public data class Fragment(val duration: Long, val usage: Double) -} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt deleted file mode 100644 index f89133dd..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowConsumerContextTest.kt +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -import io.mockk.spyk -import io.mockk.verify -import net.bytebuddy.matcher.ElementMatchers.any -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.flow.internal.FlowConsumerContextImpl -import org.opendc.simulator.flow.internal.FlowEngineImpl -import org.opendc.simulator.kotlin.runSimulation - -/** - * A test suite for the [FlowConsumerContextImpl] class. - */ -class FlowConsumerContextTest { - @Test - fun testFlushWithoutCommand() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - return if (now == 0L) { - conn.push(1.0) - 1000 - } else { - conn.close() - Long.MAX_VALUE - } - } - } - - val logic = object : FlowConsumerLogic {} - val context = FlowConsumerContextImpl(engine, consumer, logic) - - engine.scheduleSync(engine.clock.millis(), context) - } - - @Test - fun testDoubleStart() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - return if (now == 0L) { - conn.push(0.0) - 1000 - } else { - conn.close() - Long.MAX_VALUE - } - } - } - - val logic = object : FlowConsumerLogic {} - val context = FlowConsumerContextImpl(engine, consumer, logic) - - context.start() - - assertThrows<IllegalStateException> { - context.start() - } - } - - @Test - fun testIdempotentCapacityChange() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val consumer = spyk(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - return if (now == 0L) { - conn.push(1.0) - 1000 - } else { - conn.close() - Long.MAX_VALUE - } - } - }) - - val logic = object : FlowConsumerLogic {} - val context = FlowConsumerContextImpl(engine, consumer, logic) - context.capacity = 4200.0 - context.start() - context.capacity = 4200.0 - - verify(exactly = 1) { consumer.onPull(any(), any()) } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt deleted file mode 100644 index f75e5037..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowForwarderTest.kt +++ /dev/null @@ -1,331 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -import io.mockk.spyk -import io.mockk.verify -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.yield -import net.bytebuddy.matcher.ElementMatchers.any -import org.junit.jupiter.api.Assertions.assertAll -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertFalse -import org.junit.jupiter.api.Assertions.assertTrue -import org.junit.jupiter.api.Disabled -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.flow.internal.FlowEngineImpl -import org.opendc.simulator.flow.source.FixedFlowSource -import org.opendc.simulator.kotlin.runSimulation - -/** - * A test suite for the [FlowForwarder] class. - */ -internal class FlowForwarderTest { - @Test - fun testCancelImmediately() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val source = FlowSink(engine, 2000.0) - - launch { source.consume(forwarder) } - - forwarder.consume(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - conn.close() - return Long.MAX_VALUE - } - }) - - forwarder.close() - source.cancel() - } - - @Test - fun testCancel() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val source = FlowSink(engine, 2000.0) - - launch { source.consume(forwarder) } - - forwarder.consume(object : FlowSource { - var isFirst = true - - override fun onPull(conn: FlowConnection, now: Long): Long { - return if (isFirst) { - isFirst = false - conn.push(1.0) - 10 * 1000 - } else { - conn.close() - Long.MAX_VALUE - } - } - }) - - forwarder.close() - source.cancel() - } - - @Test - fun testState() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - conn.close() - return Long.MAX_VALUE - } - } - - assertFalse(forwarder.isActive) - - forwarder.startConsumer(consumer) - assertTrue(forwarder.isActive) - - assertThrows<IllegalStateException> { forwarder.startConsumer(consumer) } - - forwarder.cancel() - assertFalse(forwarder.isActive) - - forwarder.close() - assertFalse(forwarder.isActive) - } - - @Test - fun testCancelPendingDelegate() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - - val consumer = spyk(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - conn.close() - return Long.MAX_VALUE - } - }) - - forwarder.startConsumer(consumer) - forwarder.cancel() - - verify(exactly = 0) { consumer.onStop(any(), any()) } - } - - @Test - fun testCancelStartedDelegate() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val source = FlowSink(engine, 2000.0) - - val consumer = spyk(FixedFlowSource(2000.0, 1.0)) - - source.startConsumer(forwarder) - yield() - forwarder.startConsumer(consumer) - yield() - forwarder.cancel() - - verify(exactly = 1) { consumer.onStart(any(), any()) } - verify(exactly = 1) { consumer.onStop(any(), any()) } - } - - @Test - fun testCancelPropagation() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val source = FlowSink(engine, 2000.0) - - val consumer = spyk(FixedFlowSource(2000.0, 1.0)) - - source.startConsumer(forwarder) - yield() - forwarder.startConsumer(consumer) - yield() - source.cancel() - - verify(exactly = 1) { consumer.onStart(any(), any()) } - verify(exactly = 1) { consumer.onStop(any(), any()) } - } - - @Test - fun testExitPropagation() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine, isCoupled = true) - val source = FlowSink(engine, 2000.0) - - val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - conn.close() - return Long.MAX_VALUE - } - } - - source.startConsumer(forwarder) - forwarder.consume(consumer) - yield() - - assertFalse(forwarder.isActive) - } - - @Test - @Disabled // Due to Kotlin bug: https://github.com/mockk/mockk/issues/368 - fun testAdjustCapacity() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val sink = FlowSink(engine, 1.0) - - val source = spyk(FixedFlowSource(2.0, 1.0)) - sink.startConsumer(forwarder) - - coroutineScope { - launch { forwarder.consume(source) } - delay(1000) - sink.capacity = 0.5 - } - - assertEquals(3000, clock.millis()) - verify(exactly = 1) { source.onPull(any(), any()) } - } - - @Test - fun testCounters() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val source = FlowSink(engine, 1.0) - - val consumer = FixedFlowSource(2.0, 1.0) - source.startConsumer(forwarder) - - forwarder.consume(consumer) - - yield() - - assertAll( - { assertEquals(2.0, source.counters.actual) }, - { assertEquals(source.counters.actual, forwarder.counters.actual) { "Actual work" } }, - { assertEquals(source.counters.demand, forwarder.counters.demand) { "Work demand" } }, - { assertEquals(source.counters.remaining, forwarder.counters.remaining) { "Overcommitted work" } }, - { assertEquals(2000, clock.millis()) } - ) - } - - @Test - fun testCoupledExit() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine, isCoupled = true) - val source = FlowSink(engine, 2000.0) - - launch { source.consume(forwarder) } - - forwarder.consume(FixedFlowSource(2000.0, 1.0)) - - yield() - - assertFalse(source.isActive) - } - - @Test - fun testPullFailureCoupled() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine, isCoupled = true) - val source = FlowSink(engine, 2000.0) - - launch { source.consume(forwarder) } - - try { - forwarder.consume(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - throw IllegalStateException("Test") - } - }) - } catch (cause: Throwable) { - // Ignore - } - - yield() - - assertFalse(source.isActive) - } - - @Test - fun testStartFailure() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val source = FlowSink(engine, 2000.0) - - launch { source.consume(forwarder) } - - try { - forwarder.consume(object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - return Long.MAX_VALUE - } - - override fun onStart(conn: FlowConnection, now: Long) { - throw IllegalStateException("Test") - } - }) - } catch (cause: Throwable) { - // Ignore - } - - yield() - - assertTrue(source.isActive) - source.cancel() - } - - @Test - fun testConvergeFailure() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val forwarder = FlowForwarder(engine) - val source = FlowSink(engine, 2000.0) - - launch { source.consume(forwarder) } - - try { - forwarder.consume(object : FlowSource { - override fun onStart(conn: FlowConnection, now: Long) { - conn.shouldSourceConverge = true - } - - override fun onPull(conn: FlowConnection, now: Long): Long { - return Long.MAX_VALUE - } - - override fun onConverge(conn: FlowConnection, now: Long) { - throw IllegalStateException("Test") - } - }) - } catch (cause: Throwable) { - // Ignore - } - - yield() - - assertTrue(source.isActive) - source.cancel() - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt deleted file mode 100644 index 746d752d..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/FlowSinkTest.kt +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -import io.mockk.spyk -import io.mockk.verify -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.yield -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.flow.internal.FlowEngineImpl -import org.opendc.simulator.flow.source.FixedFlowSource -import org.opendc.simulator.flow.source.FlowSourceRateAdapter -import org.opendc.simulator.kotlin.runSimulation - -/** - * A test suite for the [FlowSink] class. - */ -internal class FlowSinkTest { - @Test - fun testSpeed() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - - val consumer = FixedFlowSource(4200.0, 1.0) - - val res = mutableListOf<Double>() - val adapter = FlowSourceRateAdapter(consumer, res::add) - - provider.consume(adapter) - - assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } - } - - @Test - fun testAdjustCapacity() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val provider = FlowSink(engine, 1.0) - - val consumer = spyk(FixedFlowSource(2.0, 1.0)) - - coroutineScope { - launch { provider.consume(consumer) } - delay(1000) - provider.capacity = 0.5 - } - assertEquals(3000, clock.millis()) - verify(exactly = 3) { consumer.onPull(any(), any()) } - } - - @Test - fun testSpeedLimit() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - - val consumer = FixedFlowSource(capacity, 2.0) - - val res = mutableListOf<Double>() - val adapter = FlowSourceRateAdapter(consumer, res::add) - - provider.consume(adapter) - - assertEquals(listOf(0.0, capacity, 0.0), res) { "Speed is reported correctly" } - } - - /** - * Test to see whether no infinite recursion occurs when interrupting during [FlowSource.onStart] or - * [FlowSource.onPull]. - */ - @Test - fun testIntermediateInterrupt() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - - val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long { - conn.close() - return Long.MAX_VALUE - } - - override fun onStart(conn: FlowConnection, now: Long) { - conn.pull() - } - } - - provider.consume(consumer) - } - - @Test - fun testInterrupt() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - lateinit var resCtx: FlowConnection - - val consumer = object : FlowSource { - var isFirst = true - - override fun onStart(conn: FlowConnection, now: Long) { - resCtx = conn - } - - override fun onPull(conn: FlowConnection, now: Long): Long { - return if (isFirst) { - isFirst = false - conn.push(1.0) - 4000 - } else { - conn.close() - Long.MAX_VALUE - } - } - } - - launch { - yield() - resCtx.pull() - } - provider.consume(consumer) - - assertEquals(0, clock.millis()) - } - - @Test - fun testFailure() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - - val consumer = object : FlowSource { - override fun onStart(conn: FlowConnection, now: Long) { - throw IllegalStateException("Hi") - } - - override fun onPull(conn: FlowConnection, now: Long): Long { - return Long.MAX_VALUE - } - } - - assertThrows<IllegalStateException> { - provider.consume(consumer) - } - } - - @Test - fun testExceptionPropagationOnNext() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - - val consumer = object : FlowSource { - var isFirst = true - - override fun onPull(conn: FlowConnection, now: Long): Long { - return if (isFirst) { - isFirst = false - conn.push(1.0) - 1000 - } else { - throw IllegalStateException() - } - } - } - - assertThrows<IllegalStateException> { - provider.consume(consumer) - } - } - - @Test - fun testConcurrentConsumption() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - - val consumer = FixedFlowSource(capacity, 1.0) - - assertThrows<IllegalStateException> { - coroutineScope { - launch { provider.consume(consumer) } - provider.consume(consumer) - } - } - } - - @Test - fun testCancelDuringConsumption() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - - val consumer = FixedFlowSource(capacity, 1.0) - - launch { provider.consume(consumer) } - delay(500) - provider.cancel() - - yield() - - assertEquals(500, clock.millis()) - } - - @Test - fun testInfiniteSleep() { - assertThrows<IllegalStateException> { - runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - val capacity = 4200.0 - val provider = FlowSink(engine, capacity) - - val consumer = object : FlowSource { - override fun onPull(conn: FlowConnection, now: Long): Long = Long.MAX_VALUE - } - - provider.consume(consumer) - } - } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt deleted file mode 100644 index 2409e174..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexerTest.kt +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.mux - -import kotlinx.coroutines.yield -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertAll -import org.junit.jupiter.api.assertThrows -import org.opendc.simulator.flow.FlowConnection -import org.opendc.simulator.flow.FlowForwarder -import org.opendc.simulator.flow.FlowSink -import org.opendc.simulator.flow.FlowSource -import org.opendc.simulator.flow.consume -import org.opendc.simulator.flow.internal.FlowEngineImpl -import org.opendc.simulator.flow.source.FixedFlowSource -import org.opendc.simulator.flow.source.FlowSourceRateAdapter -import org.opendc.simulator.flow.source.TraceFlowSource -import org.opendc.simulator.kotlin.runSimulation - -/** - * Test suite for the [ForwardingFlowMultiplexer] class. - */ -internal class ForwardingFlowMultiplexerTest { - /** - * Test a trace workload. - */ - @Test - fun testTrace() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val speed = mutableListOf<Double>() - - val duration = 5 * 60L - val workload = - TraceFlowSource( - sequenceOf( - TraceFlowSource.Fragment(duration * 1000, 28.0), - TraceFlowSource.Fragment(duration * 1000, 3500.0), - TraceFlowSource.Fragment(duration * 1000, 0.0), - TraceFlowSource.Fragment(duration * 1000, 183.0) - ) - ) - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - val forwarder = FlowForwarder(engine) - val adapter = FlowSourceRateAdapter(forwarder, speed::add) - source.startConsumer(adapter) - forwarder.startConsumer(switch.newOutput()) - - val provider = switch.newInput() - provider.consume(workload) - yield() - - assertAll( - { assertEquals(listOf(0.0, 28.0, 3200.0, 0.0, 183.0, 0.0), speed) { "Correct speed" } }, - { assertEquals(5 * 60L * 4000, clock.millis()) { "Took enough time" } } - ) - } - - /** - * Test runtime workload on hypervisor. - */ - @Test - fun testRuntimeWorkload() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val duration = 5 * 60L * 1000 - val workload = FixedFlowSource(duration * 3.2, 1.0) - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - - source.startConsumer(switch.newOutput()) - - val provider = switch.newInput() - provider.consume(workload) - yield() - - assertEquals(duration, clock.millis()) { "Took enough time" } - } - - /** - * Test two workloads running sequentially. - */ - @Test - fun testTwoWorkloads() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val duration = 5 * 60L * 1000 - val workload = object : FlowSource { - var isFirst = true - - override fun onStart(conn: FlowConnection, now: Long) { - isFirst = true - } - - override fun onPull(conn: FlowConnection, now: Long): Long { - return if (isFirst) { - isFirst = false - conn.push(1.0) - duration - } else { - conn.close() - Long.MAX_VALUE - } - } - } - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - - source.startConsumer(switch.newOutput()) - - val provider = switch.newInput() - provider.consume(workload) - yield() - provider.consume(workload) - assertEquals(duration * 2, clock.millis()) { "Took enough time" } - } - - /** - * Test concurrent workloads on the machine. - */ - @Test - fun testConcurrentWorkloadFails() = runSimulation { - val engine = FlowEngineImpl(coroutineContext, clock) - - val switch = ForwardingFlowMultiplexer(engine) - val source = FlowSink(engine, 3200.0) - - source.startConsumer(switch.newOutput()) - - switch.newInput() - assertThrows<IllegalStateException> { switch.newInput() } - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt deleted file mode 100644 index a6bf8ad8..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexerTest.kt +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.mux - -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.launch -import kotlinx.coroutines.yield -import org.junit.jupiter.api.Assertions.assertAll -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.opendc.simulator.flow.FlowSink -import org.opendc.simulator.flow.consume -import org.opendc.simulator.flow.internal.FlowEngineImpl -import org.opendc.simulator.flow.source.FixedFlowSource -import org.opendc.simulator.flow.source.TraceFlowSource -import org.opendc.simulator.kotlin.runSimulation - -/** - * Test suite for the [FlowMultiplexer] implementations - */ -internal class MaxMinFlowMultiplexerTest { - @Test - fun testSmoke() = runSimulation { - val scheduler = FlowEngineImpl(coroutineContext, clock) - val switch = MaxMinFlowMultiplexer(scheduler) - - val sources = List(2) { FlowSink(scheduler, 2000.0) } - sources.forEach { it.startConsumer(switch.newOutput()) } - - val provider = switch.newInput() - val consumer = FixedFlowSource(2000.0, 1.0) - - try { - provider.consume(consumer) - yield() - } finally { - switch.clear() - } - } - - /** - * Test overcommitting of resources via the hypervisor with a single VM. - */ - @Test - fun testOvercommittedSingle() = runSimulation { - val scheduler = FlowEngineImpl(coroutineContext, clock) - - val duration = 5 * 60L - val workload = - TraceFlowSource( - sequenceOf( - TraceFlowSource.Fragment(duration * 1000, 28.0), - TraceFlowSource.Fragment(duration * 1000, 3500.0), - TraceFlowSource.Fragment(duration * 1000, 0.0), - TraceFlowSource.Fragment(duration * 1000, 183.0) - ) - ) - - val switch = MaxMinFlowMultiplexer(scheduler) - val sink = FlowSink(scheduler, 3200.0) - val provider = switch.newInput() - - try { - sink.startConsumer(switch.newOutput()) - provider.consume(workload) - yield() - } finally { - switch.clear() - } - - assertAll( - { assertEquals(1113300.0, switch.counters.demand, "Requested work does not match") }, - { assertEquals(1023300.0, switch.counters.actual, "Actual work does not match") }, - { assertEquals(2816700.0, switch.counters.remaining, "Remaining capacity does not match") }, - { assertEquals(1200000, clock.millis()) } - ) - } - - /** - * Test overcommitting of resources via the hypervisor with two VMs. - */ - @Test - fun testOvercommittedDual() = runSimulation { - val scheduler = FlowEngineImpl(coroutineContext, clock) - - val duration = 5 * 60L - val workloadA = - TraceFlowSource( - sequenceOf( - TraceFlowSource.Fragment(duration * 1000, 28.0), - TraceFlowSource.Fragment(duration * 1000, 3500.0), - TraceFlowSource.Fragment(duration * 1000, 0.0), - TraceFlowSource.Fragment(duration * 1000, 183.0) - ) - ) - val workloadB = - TraceFlowSource( - sequenceOf( - TraceFlowSource.Fragment(duration * 1000, 28.0), - TraceFlowSource.Fragment(duration * 1000, 3100.0), - TraceFlowSource.Fragment(duration * 1000, 0.0), - TraceFlowSource.Fragment(duration * 1000, 73.0) - ) - ) - - val switch = MaxMinFlowMultiplexer(scheduler) - val sink = FlowSink(scheduler, 3200.0) - val providerA = switch.newInput() - val providerB = switch.newInput() - - try { - sink.startConsumer(switch.newOutput()) - - coroutineScope { - launch { providerA.consume(workloadA) } - providerB.consume(workloadB) - } - - yield() - } finally { - switch.clear() - } - assertAll( - { assertEquals(2073600.0, switch.counters.demand, "Requested work does not match") }, - { assertEquals(1053600.0, switch.counters.actual, "Granted work does not match") }, - { assertEquals(2786400.0, switch.counters.remaining, "Remaining capacity does not match") }, - { assertEquals(1200000, clock.millis()) } - ) - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt b/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt deleted file mode 100644 index 552579ff..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/test/kotlin/org/opendc/simulator/flow/source/FixedFlowSourceTest.kt +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.source - -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Test -import org.opendc.simulator.flow.FlowSink -import org.opendc.simulator.flow.consume -import org.opendc.simulator.flow.internal.FlowEngineImpl -import org.opendc.simulator.kotlin.runSimulation - -/** - * A test suite for the [FixedFlowSource] class. - */ -internal class FixedFlowSourceTest { - @Test - fun testSmoke() = runSimulation { - val scheduler = FlowEngineImpl(coroutineContext, clock) - val provider = FlowSink(scheduler, 1.0) - - val consumer = FixedFlowSource(1.0, 1.0) - - provider.consume(consumer) - assertEquals(1000, clock.millis()) - } - - @Test - fun testUtilization() = runSimulation { - val scheduler = FlowEngineImpl(coroutineContext, clock) - val provider = FlowSink(scheduler, 1.0) - - val consumer = FixedFlowSource(1.0, 0.5) - - provider.consume(consumer) - assertEquals(2000, clock.millis()) - } -} |
