diff options
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow')
12 files changed, 702 insertions, 321 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt deleted file mode 100644 index b02426e3..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -import org.opendc.simulator.flow.internal.FlowCountersImpl - -/** - * Abstract implementation of the [FlowConsumer] which can be re-used by other implementations. - */ -public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initialCapacity: Double) : FlowConsumer { - /** - * A flag to indicate that the flow consumer is active. - */ - public override val isActive: Boolean - get() = ctx != null - - /** - * The capacity of the consumer. - */ - public override var capacity: Double = initialCapacity - set(value) { - field = value - ctx?.capacity = value - } - - /** - * The current processing rate of the consumer. - */ - public override val rate: Double - get() = ctx?.rate ?: 0.0 - - /** - * The flow processing rate demand at this instant. - */ - public override val demand: Double - get() = ctx?.demand ?: 0.0 - - /** - * The flow counters to track the flow metrics of the consumer. - */ - public override val counters: FlowCounters - get() = _counters - private val _counters = FlowCountersImpl() - - /** - * The [FlowConsumerContext] that is currently running. - */ - protected var ctx: FlowConsumerContext? = null - private set - - /** - * Construct the [FlowConsumerLogic] instance for a new source. - */ - protected abstract fun createLogic(): FlowConsumerLogic - - /** - * Start the specified [FlowConsumerContext]. - */ - protected open fun start(ctx: FlowConsumerContext) { - ctx.start() - } - - /** - * The previous demand for the consumer. - */ - private var _previousDemand = 0.0 - private var _previousCapacity = 0.0 - - /** - * Update the counters of the flow consumer. - */ - protected fun updateCounters(ctx: FlowConnection, delta: Long) { - val demand = _previousDemand - val capacity = _previousCapacity - - _previousDemand = ctx.demand - _previousCapacity = ctx.capacity - - if (delta <= 0) { - return - } - - val counters = _counters - val deltaS = delta / 1000.0 - val total = demand * deltaS - val work = capacity * deltaS - val actualWork = ctx.rate * deltaS - - counters.demand += work - counters.actual += actualWork - counters.remaining += (total - actualWork) - } - - /** - * Update the counters of the flow consumer. - */ - protected fun updateCounters(demand: Double, actual: Double, remaining: Double) { - val counters = _counters - counters.demand += demand - counters.actual += actual - counters.remaining += remaining - } - - final override fun startConsumer(source: FlowSource) { - check(ctx == null) { "Consumer is in invalid state" } - val ctx = engine.newContext(source, createLogic()) - - ctx.capacity = capacity - this.ctx = ctx - - start(ctx) - } - - final override fun pull() { - ctx?.pull() - } - - final override fun cancel() { - val ctx = ctx - if (ctx != null) { - this.ctx = null - ctx.close() - } - } - - override fun toString(): String = "AbstractFlowConsumer[capacity=$capacity]" -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt index c327e1e9..8ff0bc76 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt @@ -52,6 +52,13 @@ public interface FlowConnection : AutoCloseable { public fun pull() /** + * Pull the source. + * + * @param now The timestamp at which the connection is pulled. + */ + public fun pull(now: Long) + + /** * Push the given flow [rate] over this connection. * * @param rate The rate of the flow to push. diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt index d7182497..98922ab3 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt @@ -55,6 +55,8 @@ public interface FlowConsumerContext : FlowConnection { /** * Synchronously pull the source of the connection. + * + * @param now The timestamp at which the connection is pulled. */ - public fun pullSync() + public fun pullSync(now: Long) } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index 7eaaf6c2..e3bdd7ba 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -23,7 +23,8 @@ package org.opendc.simulator.flow import mu.KotlinLogging -import org.opendc.simulator.flow.internal.FlowCountersImpl +import org.opendc.simulator.flow.internal.D_MS_TO_S +import org.opendc.simulator.flow.internal.MutableFlowCounters import kotlin.math.max /** @@ -71,6 +72,10 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled _innerCtx?.pull() } + override fun pull(now: Long) { + _innerCtx?.pull(now) + } + @JvmField var lastPull = Long.MAX_VALUE override fun push(rate: Double) { @@ -117,7 +122,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled override val counters: FlowCounters get() = _counters - private val _counters = FlowCountersImpl() + private val _counters = MutableFlowCounters() override fun startConsumer(source: FlowSource) { check(delegate == null) { "Forwarder already active" } @@ -241,12 +246,11 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled } val counters = _counters - val deltaS = delta / 1000.0 + val deltaS = delta * D_MS_TO_S val total = ctx.capacity * deltaS val work = _demand * deltaS val actualWork = ctx.rate * deltaS - counters.demand += work - counters.actual += actualWork - counters.remaining += (total - actualWork) + + counters.increment(work, actualWork, (total - actualWork), 0.0) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt index b4eb6a38..e9094443 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt @@ -22,6 +22,9 @@ package org.opendc.simulator.flow +import org.opendc.simulator.flow.internal.D_MS_TO_S +import org.opendc.simulator.flow.internal.MutableFlowCounters + /** * A [FlowSink] represents a sink with a fixed capacity. * @@ -33,38 +36,120 @@ public class FlowSink( private val engine: FlowEngine, initialCapacity: Double, private val parent: FlowConvergenceListener? = null -) : AbstractFlowConsumer(engine, initialCapacity) { +) : FlowConsumer { + /** + * A flag to indicate that the flow consumer is active. + */ + public override val isActive: Boolean + get() = _ctx != null + + /** + * The capacity of the consumer. + */ + public override var capacity: Double = initialCapacity + set(value) { + field = value + _ctx?.capacity = value + } + + /** + * The current processing rate of the consumer. + */ + public override val rate: Double + get() = _ctx?.rate ?: 0.0 + + /** + * The flow processing rate demand at this instant. + */ + public override val demand: Double + get() = _ctx?.demand ?: 0.0 + + /** + * The flow counters to track the flow metrics of the consumer. + */ + public override val counters: FlowCounters + get() = _counters + private val _counters = MutableFlowCounters() + + /** + * The current active [FlowConsumerLogic] of this sink. + */ + private var _ctx: FlowConsumerContext? = null + + override fun startConsumer(source: FlowSource) { + check(_ctx == null) { "Consumer is in invalid state" } - override fun start(ctx: FlowConsumerContext) { + val ctx = engine.newContext(source, Logic(parent, _counters)) + _ctx = ctx + + ctx.capacity = capacity if (parent != null) { ctx.shouldConsumerConverge = true } - super.start(ctx) + + ctx.start() } - override fun createLogic(): FlowConsumerLogic { - return object : FlowConsumerLogic { - private val parent = this@FlowSink.parent - - override fun onPush( - ctx: FlowConsumerContext, - now: Long, - delta: Long, - rate: Double - ) { - updateCounters(ctx, delta) - } + override fun pull() { + _ctx?.pull() + } - override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { - updateCounters(ctx, delta) - cancel() - } + override fun cancel() { + _ctx?.close() + } + + override fun toString(): String = "FlowSink[capacity=$capacity]" + + /** + * [FlowConsumerLogic] of a sink. + */ + private inner class Logic(private val parent: FlowConvergenceListener?, private val counters: MutableFlowCounters) : FlowConsumerLogic { + override fun onPush( + ctx: FlowConsumerContext, + now: Long, + delta: Long, + rate: Double + ) { + updateCounters(ctx, delta, rate, ctx.capacity) + } + + override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { + updateCounters(ctx, delta, 0.0, 0.0) + + _ctx = null + } + + override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { + parent?.onConverge(now, delta) + } - override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { - parent?.onConverge(now, delta) + /** + * The previous demand and capacity for the consumer. + */ + private val _previous = DoubleArray(2) + + /** + * Update the counters of the flow consumer. + */ + private fun updateCounters(ctx: FlowConnection, delta: Long, nextDemand: Double, nextCapacity: Double) { + val counters = counters + val previous = _previous + val demand = previous[0] + val capacity = previous[1] + + previous[0] = nextDemand + previous[1] = nextCapacity + + if (delta <= 0) { + return } + + val deltaS = delta * D_MS_TO_S + val total = demand * deltaS + val work = capacity * deltaS + val actualWork = ctx.rate * deltaS + + counters.increment(work, actualWork, (total - actualWork), 0.0) } } - - override fun toString(): String = "FlowSink[capacity=$capacity]" } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt new file mode 100644 index 00000000..450195ec --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.internal + +/** + * Constant for converting milliseconds into seconds. + */ +internal const val D_MS_TO_S = 1 / 1000.0 diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index 9a568897..58ca918b 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -134,8 +134,8 @@ internal class FlowConsumerContextImpl( /** * The timers at which the context is scheduled to be interrupted. */ - private var _timer: FlowEngineImpl.Timer? = null - private val _pendingTimers: ArrayDeque<FlowEngineImpl.Timer> = ArrayDeque(5) + private var _timer: Long = Long.MAX_VALUE + private val _pendingTimers: ArrayDeque<Long> = ArrayDeque(5) override fun start() { check(_flags and ConnState == ConnPending) { "Consumer is already started" } @@ -164,17 +164,21 @@ internal class FlowConsumerContextImpl( } } - override fun pull() { + override fun pull(now: Long) { val flags = _flags if (flags and ConnState != ConnActive) { return } // Mark connection as pulled - scheduleImmediate(_clock.millis(), flags or ConnPulled) + scheduleImmediate(now, flags or ConnPulled) } - override fun pullSync() { + override fun pull() { + pull(_clock.millis()) + } + + override fun pullSync(now: Long) { val flags = _flags // Do not attempt to flush the connection if the connection is closed or an update is already active @@ -182,8 +186,6 @@ internal class FlowConsumerContextImpl( return } - val now = _clock.millis() - if (flags and (ConnPulled or ConnPushed) != 0 || _deadline == now) { engine.scheduleSync(now, this) } @@ -217,8 +219,8 @@ internal class FlowConsumerContextImpl( */ fun doUpdate( now: Long, - visited: ArrayDeque<FlowConsumerContextImpl>, - timerQueue: PriorityQueue<FlowEngineImpl.Timer>, + visited: FlowDeque, + timerQueue: FlowTimerQueue, isImmediate: Boolean ) { var flags = _flags @@ -326,8 +328,7 @@ internal class FlowConsumerContextImpl( // Prune the head timer if this is a delayed update val timer = if (!isImmediate) { // Invariant: Any pending timer should only point to a future timestamp - // See also `scheduleDelayed` - val timer = pendingTimers.poll() + val timer = pendingTimers.poll() ?: Long.MAX_VALUE _timer = timer timer } else { @@ -342,7 +343,7 @@ internal class FlowConsumerContextImpl( if (newDeadline == Long.MAX_VALUE || flags and ConnState != ConnActive || flags and ConnDisableTimers != 0 || - (timer != null && newDeadline >= timer.target) + (timer != Long.MAX_VALUE && newDeadline >= timer) ) { // Ignore any deadline scheduled at the maximum value // This indicates that the source does not want to register a timer @@ -350,12 +351,11 @@ internal class FlowConsumerContextImpl( } // Construct a timer for the new deadline and add it to the global queue of timers - val newTimer = FlowEngineImpl.Timer(this, newDeadline) - _timer = newTimer - timerQueue.add(newTimer) + _timer = newDeadline + timerQueue.add(this, newDeadline) - // A timer already exists for this connection, so add it to the queue of pending timers - if (timer != null) { + // Slow-path: a timer already exists for this connection, so add it to the queue of pending timers + if (timer != Long.MAX_VALUE) { pendingTimers.addFirst(timer) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt new file mode 100644 index 00000000..c6cba4b7 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.internal + +import java.util.* + +/** + * A specialized [ArrayDeque] for [FlowConsumerContextImpl] implementations. + */ +internal class FlowDeque(initialCapacity: Int = 256) { + /** + * The array of elements in the queue. + */ + private var _elements: Array<FlowConsumerContextImpl?> = arrayOfNulls(initialCapacity) + private var _head = 0 + private var _tail = 0 + + /** + * Determine whether this queue is not empty. + */ + fun isNotEmpty(): Boolean { + return _head != _tail + } + + /** + * Add the specified [ctx] to the queue. + */ + fun add(ctx: FlowConsumerContextImpl) { + val es = _elements + var tail = _tail + + es[tail] = ctx + + tail = inc(tail, es.size) + _tail = tail + + if (_head == tail) { + doubleCapacity() + } + } + + /** + * Remove a [FlowConsumerContextImpl] from the queue or `null` if the queue is empty. + */ + fun poll(): FlowConsumerContextImpl? { + val es = _elements + val head = _head + val ctx = es[head] + + if (ctx != null) { + es[head] = null + _head = inc(head, es.size) + } + + return ctx + } + + /** + * Clear the queue. + */ + fun clear() { + _elements.fill(null) + _head = 0 + _tail = 0 + } + + private fun inc(i: Int, modulus: Int): Int { + var x = i + if (++x >= modulus) { + x = 0 + } + return x + } + + /** + * Doubles the capacity of this deque + */ + private fun doubleCapacity() { + assert(_head == _tail) + val p = _head + val n = _elements.size + val r = n - p // number of elements to the right of p + + val newCapacity = n shl 1 + check(newCapacity >= 0) { "Sorry, deque too big" } + + val a = arrayOfNulls<FlowConsumerContextImpl>(newCapacity) + + _elements.copyInto(a, 0, p, r) + _elements.copyInto(a, r, 0, p) + + _elements = a + _head = 0 + _tail = n + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt index a9234abf..3c79d54e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt @@ -38,7 +38,7 @@ import kotlin.coroutines.CoroutineContext * @param context The coroutine context to use. * @param clock The virtual simulation clock. */ -internal class FlowEngineImpl(private val context: CoroutineContext, override val clock: Clock) : FlowEngine, Runnable { +internal class FlowEngineImpl(private val context: CoroutineContext, clock: Clock) : FlowEngine, Runnable { /** * The [Delay] instance that provides scheduled execution of [Runnable]s. */ @@ -48,12 +48,12 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va /** * The queue of connection updates that are scheduled for immediate execution. */ - private val queue = ArrayDeque<FlowConsumerContextImpl>() + private val queue = FlowDeque() /** * A priority queue containing the connection updates to be scheduled in the future. */ - private val futureQueue = PriorityQueue<Timer>() + private val futureQueue = FlowTimerQueue() /** * The stack of engine invocations to occur in the future. @@ -63,7 +63,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va /** * The systems that have been visited during the engine cycle. */ - private val visited: ArrayDeque<FlowConsumerContextImpl> = ArrayDeque() + private val visited = FlowDeque() /** * The index in the batch stack. @@ -71,6 +71,13 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va private var batchIndex = 0 /** + * The virtual [Clock] of this engine. + */ + override val clock: Clock + get() = _clock + private val _clock: Clock = clock + + /** * Update the specified [ctx] synchronously. */ fun scheduleSync(now: Long, ctx: FlowConsumerContextImpl) { @@ -113,7 +120,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va try { // Flush the work if the engine is not already running if (batchIndex == 1 && queue.isNotEmpty()) { - doRunEngine(clock.millis()) + doRunEngine(_clock.millis()) } } finally { batchIndex-- @@ -122,11 +129,8 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va /* Runnable */ override fun run() { - val now = clock.millis() val invocation = futureInvocations.poll() // Clear invocation from future invocation queue - assert(now >= invocation.timestamp) { "Future invocations invariant violated" } - - doRunEngine(now) + doRunEngine(invocation.timestamp) } /** @@ -144,17 +148,8 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va // Execute all scheduled updates at current timestamp while (true) { - val timer = futureQueue.peek() ?: break - val target = timer.target - - if (target > now) { - break - } - - assert(target >= now) { "Internal inconsistency: found update of the past" } - - futureQueue.poll() - timer.ctx.doUpdate(now, visited, futureQueue, isImmediate = false) + val ctx = futureQueue.poll(now) ?: break + ctx.doUpdate(now, visited, futureQueue, isImmediate = false) } // Repeat execution of all immediate updates until the system has converged to a steady-state @@ -177,9 +172,9 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va } // Schedule an engine invocation for the next update to occur. - val headTimer = futureQueue.peek() - if (headTimer != null) { - trySchedule(now, futureInvocations, headTimer.target) + val headDeadline = futureQueue.peekDeadline() + if (headDeadline != Long.MAX_VALUE) { + trySchedule(now, futureInvocations, headDeadline) } } @@ -217,17 +212,4 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va */ fun cancel() = handle.dispose() } - - /** - * An update call for [ctx] that is scheduled for [target]. - * - * This class represents an update in the future at [target] requested by [ctx]. - */ - class Timer(@JvmField val ctx: FlowConsumerContextImpl, @JvmField val target: Long) : Comparable<Timer> { - override fun compareTo(other: Timer): Int { - return target.compareTo(other.target) - } - - override fun toString(): String = "Timer[ctx=$ctx,timestamp=$target]" - } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt new file mode 100644 index 00000000..22a390e6 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.internal + +/** + * Specialized priority queue for flow timers. + */ +internal class FlowTimerQueue(initialCapacity: Int = 256) { + /** + * The binary heap of deadlines. + */ + private var _deadlines = LongArray(initialCapacity) { Long.MIN_VALUE } + + /** + * The binary heap of [FlowConsumerContextImpl]s. + */ + private var _pending = arrayOfNulls<FlowConsumerContextImpl>(initialCapacity) + + /** + * The number of elements in the priority queue. + */ + private var size = 0 + + /** + * Register a timer for [ctx] with [deadline]. + */ + fun add(ctx: FlowConsumerContextImpl, deadline: Long) { + val i = size + val deadlines = _deadlines + if (i >= deadlines.size) { + grow() + } + + siftUp(deadlines, _pending, i, ctx, deadline) + + size = i + 1 + } + + /** + * Update all pending [FlowConsumerContextImpl]s at the timestamp [now]. + */ + fun poll(now: Long): FlowConsumerContextImpl? { + if (size == 0) { + return null + } + + val deadlines = _deadlines + val deadline = deadlines[0] + + if (now < deadline) { + return null + } + + val pending = _pending + val res = pending[0] + val s = --size + + val nextDeadline = deadlines[s] + val next = pending[s]!! + + // Clear the last element of the queue + pending[s] = null + deadlines[s] = Long.MIN_VALUE + + if (s != 0) { + siftDown(deadlines, pending, next, nextDeadline) + } + + return res + } + + /** + * Find the earliest deadline in the queue. + */ + fun peekDeadline(): Long { + return if (size == 0) Long.MAX_VALUE else _deadlines[0] + } + + /** + * Increases the capacity of the array. + */ + private fun grow() { + val oldCapacity = _deadlines.size + // Double size if small; else grow by 50% + val newCapacity = oldCapacity + if (oldCapacity < 64) oldCapacity + 2 else oldCapacity shr 1 + + _deadlines = _deadlines.copyOf(newCapacity) + _pending = _pending.copyOf(newCapacity) + } + + /** + * Insert item [ctx] at position [pos], maintaining heap invariant by promoting [ctx] up the tree until it is + * greater than or equal to its parent, or is the root. + * + * @param deadlines The heap of deadlines. + * @param pending The heap of contexts. + * @param pos The position to fill. + * @param ctx The [FlowConsumerContextImpl] to insert. + * @param deadline The deadline of the context. + */ + private fun siftUp( + deadlines: LongArray, + pending: Array<FlowConsumerContextImpl?>, + pos: Int, + ctx: FlowConsumerContextImpl, + deadline: Long + ) { + var k = pos + + while (k > 0) { + val parent = (k - 1) ushr 1 + val parentDeadline = deadlines[parent] + + if (deadline >= parentDeadline) { + break + } + + deadlines[k] = parentDeadline + pending[k] = pending[parent] + + k = parent + } + + deadlines[k] = deadline + pending[k] = ctx + } + + /** + * Inserts [ctx] at the top, maintaining heap invariant by demoting [ctx] down the tree repeatedly until it + * is less than or equal to its children or is a leaf. + * + * @param deadlines The heap of deadlines. + * @param pending The heap of contexts. + * @param ctx The [FlowConsumerContextImpl] to insert. + * @param deadline The deadline of the context. + */ + private fun siftDown( + deadlines: LongArray, + pending: Array<FlowConsumerContextImpl?>, + ctx: FlowConsumerContextImpl, + deadline: Long + ) { + var k = 0 + val size = size + val half = size ushr 1 + + while (k < half) { + var child = (k shl 1) + 1 + + var childDeadline = deadlines[child] + val right = child + 1 + + if (right < size) { + val rightDeadline = deadlines[right] + + if (childDeadline > rightDeadline) { + child = right + childDeadline = rightDeadline + } + } + + if (deadline <= childDeadline) { + break + } + + deadlines[k] = childDeadline + pending[k] = pending[child] + + k = child + } + + deadlines[k] = deadline + pending[k] = ctx + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt index d2fa5228..d990dc61 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt @@ -27,17 +27,27 @@ import org.opendc.simulator.flow.FlowCounters /** * Mutable implementation of the [FlowCounters] interface. */ -internal class FlowCountersImpl : FlowCounters { - override var demand: Double = 0.0 - override var actual: Double = 0.0 - override var remaining: Double = 0.0 - override var interference: Double = 0.0 +public class MutableFlowCounters : FlowCounters { + override val demand: Double + get() = _counters[0] + override val actual: Double + get() = _counters[1] + override val remaining: Double + get() = _counters[2] + override val interference: Double + get() = _counters[3] + private val _counters = DoubleArray(4) override fun reset() { - demand = 0.0 - actual = 0.0 - remaining = 0.0 - interference = 0.0 + _counters.fill(0.0) + } + + public fun increment(demand: Double, actual: Double, remaining: Double, interference: Double) { + val counters = _counters + counters[0] += demand + counters[1] += actual + counters[2] += remaining + counters[3] += interference } override fun toString(): String { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 97059e93..a0fb8a4e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -25,7 +25,8 @@ package org.opendc.simulator.flow.mux import org.opendc.simulator.flow.* import org.opendc.simulator.flow.interference.InterferenceDomain import org.opendc.simulator.flow.interference.InterferenceKey -import org.opendc.simulator.flow.internal.FlowCountersImpl +import org.opendc.simulator.flow.internal.D_MS_TO_S +import org.opendc.simulator.flow.internal.MutableFlowCounters import kotlin.math.max import kotlin.math.min @@ -85,7 +86,7 @@ public class MaxMinFlowMultiplexer( private val scheduler = Scheduler(engine, parent) override fun newInput(key: InterferenceKey?): FlowConsumer { - val provider = Input(engine, scheduler, interferenceDomain, key) + val provider = Input(engine, scheduler, interferenceDomain, key, scheduler.capacity) _inputs.add(provider) return provider } @@ -135,11 +136,11 @@ public class MaxMinFlowMultiplexer( /** * Helper class containing the scheduler state. */ - private class Scheduler(private val engine: FlowEngine, private val parent: FlowConvergenceListener?) { + private class Scheduler(engine: FlowEngine, private val parent: FlowConvergenceListener?) { /** * The flow counters of this scheduler. */ - @JvmField val counters = FlowCountersImpl() + @JvmField val counters = MutableFlowCounters() /** * The flow rate of the multiplexer. @@ -167,6 +168,11 @@ public class MaxMinFlowMultiplexer( private val _activeInputs = mutableListOf<Input>() /** + * An array containing the active inputs, which is used to reduce the overhead of an [ArrayList]. + */ + private var _inputArray = emptyArray<Input>() + + /** * The active outputs registered with the scheduler. */ private val _activeOutputs = mutableListOf<Output>() @@ -184,10 +190,16 @@ public class MaxMinFlowMultiplexer( private var _lastConvergeInput: Input? = null /** + * The simulation clock. + */ + private val _clock = engine.clock + + /** * Register the specified [input] to this scheduler. */ fun registerInput(input: Input) { _activeInputs.add(input) + _inputArray = _activeInputs.toTypedArray() val hasActivationOutput = activationOutput != null @@ -195,7 +207,8 @@ public class MaxMinFlowMultiplexer( input.shouldConsumerConverge = !hasActivationOutput input.enableTimers = !hasActivationOutput input.capacity = capacity - trigger(engine.clock.millis()) + + trigger(_clock.millis()) } /** @@ -207,6 +220,8 @@ public class MaxMinFlowMultiplexer( _lastConvergeInput = null } + _activeInputs.remove(input) + // Re-run scheduler to distribute new load trigger(now) } @@ -287,7 +302,7 @@ public class MaxMinFlowMultiplexer( // a few inputs and little changes at the same timestamp. // We always pick for option (1) unless there are no outputs available. if (activationOutput != null) { - activationOutput.pull() + activationOutput.pull(now) return } else { runScheduler(now) @@ -305,7 +320,7 @@ public class MaxMinFlowMultiplexer( return try { _schedulerActive = true - doRunScheduler(delta) + doRunScheduler(now, delta) } finally { _schedulerActive = false } @@ -356,15 +371,17 @@ public class MaxMinFlowMultiplexer( * * @return The deadline after which a new scheduling cycle should start. */ - private fun doRunScheduler(delta: Long): Long { + private fun doRunScheduler(now: Long, delta: Long): Long { val activeInputs = _activeInputs val activeOutputs = _activeOutputs + var inputArray = _inputArray + var inputSize = _inputArray.size // Update the counters of the scheduler updateCounters(delta) // If there is no work yet, mark the inputs as idle. - if (activeInputs.isEmpty()) { + if (inputSize == 0) { demand = 0.0 rate = 0.0 return Long.MAX_VALUE @@ -372,53 +389,70 @@ public class MaxMinFlowMultiplexer( val capacity = capacity var availableCapacity = capacity + var deadline = Long.MAX_VALUE + var demand = 0.0 + var shouldRebuild = false - // Pull in the work of the outputs - val inputIterator = activeInputs.listIterator() - for (input in inputIterator) { - input.pullSync() + // Pull in the work of the inputs + for (i in 0 until inputSize) { + val input = inputArray[i] - // Remove outputs that have finished + input.pullSync(now) + + // Remove inputs that have finished if (!input.isActive) { input.actualRate = 0.0 - inputIterator.remove() + shouldRebuild = true + } else { + demand += input.limit + deadline = min(deadline, input.deadline) } } - var demand = 0.0 - var deadline = Long.MAX_VALUE + // Slow-path: Rebuild the input array based on the (apparently) updated `activeInputs` + if (shouldRebuild) { + inputArray = activeInputs.toTypedArray() + inputSize = inputArray.size + _inputArray = inputArray + } - // Sort in-place the inputs based on their pushed flow. - // Profiling shows that it is faster than maintaining some kind of sorted set. - activeInputs.sort() + val rate = if (demand > capacity) { + // If the demand is higher than the capacity, we need use max-min fair sharing to distribute the + // constrained capacity across the inputs. - // Divide the available output capacity fairly over the inputs using max-min fair sharing - val size = activeInputs.size - for (i in activeInputs.indices) { - val input = activeInputs[i] - val availableShare = availableCapacity / (size - i) - val grantedRate = min(input.allowedRate, availableShare) + // Sort in-place the inputs based on their pushed flow. + // Profiling shows that it is faster than maintaining some kind of sorted set. + inputArray.sort() - demand += input.limit - deadline = min(deadline, input.deadline) - availableCapacity -= grantedRate + // Divide the available output capacity fairly over the inputs using max-min fair sharing + for (i in 0 until inputSize) { + val input = inputArray[i] + val availableShare = availableCapacity / (inputSize - i) + val grantedRate = min(input.allowedRate, availableShare) - input.actualRate = grantedRate - } + availableCapacity -= grantedRate + input.actualRate = grantedRate + } - val rate = capacity - availableCapacity + capacity - availableCapacity + } else { + demand + } this.demand = demand - this.rate = rate - - // Divide the requests over the available capacity of the input resources fairly - for (i in activeOutputs.indices) { - val output = activeOutputs[i] - val inputCapacity = output.capacity - val fraction = inputCapacity / capacity - val grantedSpeed = rate * fraction - - output.push(grantedSpeed) + if (this.rate != rate) { + // Only update the outputs if the output rate has changed + this.rate = rate + + // Divide the requests over the available capacity of the input resources fairly + for (i in activeOutputs.indices) { + val output = activeOutputs[i] + val inputCapacity = output.capacity + val fraction = inputCapacity / capacity + val grantedSpeed = rate * fraction + + output.push(grantedSpeed) + } } return deadline @@ -440,11 +474,16 @@ public class MaxMinFlowMultiplexer( return } - val deltaS = delta / 1000.0 + val deltaS = delta * D_MS_TO_S + val demand = demand + val rate = rate - counters.demand += demand * deltaS - counters.actual += rate * deltaS - counters.remaining += (previousCapacity - rate) * deltaS + counters.increment( + demand = demand * deltaS, + actual = rate * deltaS, + remaining = (previousCapacity - rate) * deltaS, + interference = 0.0 + ) } } @@ -452,32 +491,48 @@ public class MaxMinFlowMultiplexer( * An internal [FlowConsumer] implementation for multiplexer inputs. */ private class Input( - engine: FlowEngine, + private val engine: FlowEngine, private val scheduler: Scheduler, private val interferenceDomain: InterferenceDomain?, - @JvmField val key: InterferenceKey? - ) : AbstractFlowConsumer(engine, scheduler.capacity), FlowConsumerLogic, Comparable<Input> { + @JvmField val key: InterferenceKey?, + initialCapacity: Double, + ) : FlowConsumer, FlowConsumerLogic, Comparable<Input> { /** - * The requested limit. + * A flag to indicate that the consumer is active. */ - @JvmField var limit: Double = 0.0 + override val isActive: Boolean + get() = _ctx != null /** - * The actual processing speed. + * The demand of the consumer. */ - @JvmField var actualRate: Double = 0.0 + override val demand: Double + get() = limit /** - * The deadline of the input. + * The processing rate of the consumer. */ - val deadline: Long - get() = ctx?.deadline ?: Long.MAX_VALUE + override val rate: Double + get() = actualRate /** - * The processing rate that is allowed by the model constraints. + * The capacity of the input. */ - val allowedRate: Double - get() = min(capacity, limit) + override var capacity: Double + get() = _capacity + set(value) { + allowedRate = min(limit, value) + _capacity = value + _ctx?.capacity = value + } + private var _capacity = initialCapacity + + /** + * The flow counters to track the flow metrics of the consumer. + */ + override val counters: FlowCounters + get() = _counters + private val _counters = MutableFlowCounters() /** * A flag to enable timers for the input. @@ -485,7 +540,7 @@ public class MaxMinFlowMultiplexer( var enableTimers: Boolean = true set(value) { field = value - ctx?.enableTimers = value + _ctx?.enableTimers = value } /** @@ -494,10 +549,36 @@ public class MaxMinFlowMultiplexer( var shouldConsumerConverge: Boolean = true set(value) { field = value - ctx?.shouldConsumerConverge = value + _ctx?.shouldConsumerConverge = value } /** + * The requested limit. + */ + @JvmField var limit: Double = 0.0 + + /** + * The actual processing speed. + */ + @JvmField var actualRate: Double = 0.0 + + /** + * The processing rate that is allowed by the model constraints. + */ + @JvmField var allowedRate: Double = 0.0 + + /** + * The deadline of the input. + */ + val deadline: Long + get() = _ctx?.deadline ?: Long.MAX_VALUE + + /** + * The [FlowConsumerContext] that is currently running. + */ + private var _ctx: FlowConsumerContext? = null + + /** * A flag to indicate that the input is closed. */ private var _isClosed: Boolean = false @@ -512,13 +593,33 @@ public class MaxMinFlowMultiplexer( cancel() } - /* AbstractFlowConsumer */ - override fun createLogic(): FlowConsumerLogic = this + /** + * Pull the source if necessary. + */ + fun pullSync(now: Long) { + _ctx?.pullSync(now) + } - override fun start(ctx: FlowConsumerContext) { + /* FlowConsumer */ + override fun startConsumer(source: FlowSource) { check(!_isClosed) { "Cannot re-use closed input" } + check(_ctx == null) { "Consumer is in invalid state" } + + val ctx = engine.newContext(source, this) + _ctx = ctx + + ctx.capacity = capacity scheduler.registerInput(this) - super.start(ctx) + + ctx.start() + } + + override fun pull() { + _ctx?.pull() + } + + override fun cancel() { + _ctx?.close() } /* FlowConsumerLogic */ @@ -530,8 +631,10 @@ public class MaxMinFlowMultiplexer( ) { doUpdateCounters(delta) - actualRate = 0.0 + val allowed = min(rate, capacity) limit = rate + actualRate = allowed + allowedRate = allowed scheduler.trigger(now) } @@ -541,11 +644,11 @@ public class MaxMinFlowMultiplexer( limit = 0.0 actualRate = 0.0 + allowedRate = 0.0 scheduler.deregisterInput(this, now) - // BUG: Cancel the connection so that `ctx` is set to `null` - cancel() + _ctx = null } override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { @@ -556,13 +659,6 @@ public class MaxMinFlowMultiplexer( override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate) /** - * Pull the source if necessary. - */ - fun pullSync() { - ctx?.pullSync() - } - - /** * Helper method to update the flow counters of the multiplexer. */ private fun doUpdateCounters(delta: Long) { @@ -578,14 +674,16 @@ public class MaxMinFlowMultiplexer( 1.0 } - val deltaS = delta / 1000.0 + val actualRate = actualRate + + val deltaS = delta * D_MS_TO_S val demand = limit * deltaS val actual = actualRate * deltaS - val remaining = (capacity - actualRate) * deltaS + val remaining = (_capacity - actualRate) * deltaS + val interference = actual * max(0.0, 1 - perfScore) - updateCounters(demand, actual, remaining) - - scheduler.counters.interference += actual * max(0.0, 1 - perfScore) + _counters.increment(demand, actual, remaining, interference) + scheduler.counters.increment(0.0, 0.0, 0.0, interference) } } @@ -636,8 +734,8 @@ public class MaxMinFlowMultiplexer( /** * Pull this output. */ - fun pull() { - _conn?.pull() + fun pull(now: Long) { + _conn?.pull(now) } override fun onStart(conn: FlowConnection, now: Long) { @@ -675,6 +773,7 @@ public class MaxMinFlowMultiplexer( // Output is not the activation output, so trigger activation output and do not install timer for this // output (by returning `Long.MAX_VALUE`) scheduler.trigger(now) + Long.MAX_VALUE } } |
