diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-08 16:49:55 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-08 17:11:53 +0200 |
| commit | 4433185388f843140aad096dfdd88dfe2398bf2b (patch) | |
| tree | 7d64311512b23360c1c6401c54a95a57dcdb98b7 /opendc-simulator | |
| parent | f9483bc5782d86637777c0d21c383ce3e2c0851b (diff) | |
perf(simulator): Specialize FlowEngine queues
This change specializes the queues used by the FlowEngine implementation
in order to reduce the overhead caused by type-erasure of generics.
Diffstat (limited to 'opendc-simulator')
4 files changed, 329 insertions, 42 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index 9a568897..0baa7880 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -134,8 +134,8 @@ internal class FlowConsumerContextImpl( /** * The timers at which the context is scheduled to be interrupted. */ - private var _timer: FlowEngineImpl.Timer? = null - private val _pendingTimers: ArrayDeque<FlowEngineImpl.Timer> = ArrayDeque(5) + private var _timer: Long = Long.MAX_VALUE + private val _pendingTimers: ArrayDeque<Long> = ArrayDeque(5) override fun start() { check(_flags and ConnState == ConnPending) { "Consumer is already started" } @@ -217,8 +217,8 @@ internal class FlowConsumerContextImpl( */ fun doUpdate( now: Long, - visited: ArrayDeque<FlowConsumerContextImpl>, - timerQueue: PriorityQueue<FlowEngineImpl.Timer>, + visited: FlowDeque, + timerQueue: FlowTimerQueue, isImmediate: Boolean ) { var flags = _flags @@ -326,8 +326,7 @@ internal class FlowConsumerContextImpl( // Prune the head timer if this is a delayed update val timer = if (!isImmediate) { // Invariant: Any pending timer should only point to a future timestamp - // See also `scheduleDelayed` - val timer = pendingTimers.poll() + val timer = pendingTimers.poll() ?: Long.MAX_VALUE _timer = timer timer } else { @@ -342,7 +341,7 @@ internal class FlowConsumerContextImpl( if (newDeadline == Long.MAX_VALUE || flags and ConnState != ConnActive || flags and ConnDisableTimers != 0 || - (timer != null && newDeadline >= timer.target) + (timer != Long.MAX_VALUE && newDeadline >= timer) ) { // Ignore any deadline scheduled at the maximum value // This indicates that the source does not want to register a timer @@ -350,12 +349,11 @@ internal class FlowConsumerContextImpl( } // Construct a timer for the new deadline and add it to the global queue of timers - val newTimer = FlowEngineImpl.Timer(this, newDeadline) - _timer = newTimer - timerQueue.add(newTimer) + _timer = newDeadline + timerQueue.add(this, newDeadline) - // A timer already exists for this connection, so add it to the queue of pending timers - if (timer != null) { + // Slow-path: a timer already exists for this connection, so add it to the queue of pending timers + if (timer != Long.MAX_VALUE) { pendingTimers.addFirst(timer) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt new file mode 100644 index 00000000..c6cba4b7 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.internal + +import java.util.* + +/** + * A specialized [ArrayDeque] for [FlowConsumerContextImpl] implementations. + */ +internal class FlowDeque(initialCapacity: Int = 256) { + /** + * The array of elements in the queue. + */ + private var _elements: Array<FlowConsumerContextImpl?> = arrayOfNulls(initialCapacity) + private var _head = 0 + private var _tail = 0 + + /** + * Determine whether this queue is not empty. + */ + fun isNotEmpty(): Boolean { + return _head != _tail + } + + /** + * Add the specified [ctx] to the queue. + */ + fun add(ctx: FlowConsumerContextImpl) { + val es = _elements + var tail = _tail + + es[tail] = ctx + + tail = inc(tail, es.size) + _tail = tail + + if (_head == tail) { + doubleCapacity() + } + } + + /** + * Remove a [FlowConsumerContextImpl] from the queue or `null` if the queue is empty. + */ + fun poll(): FlowConsumerContextImpl? { + val es = _elements + val head = _head + val ctx = es[head] + + if (ctx != null) { + es[head] = null + _head = inc(head, es.size) + } + + return ctx + } + + /** + * Clear the queue. + */ + fun clear() { + _elements.fill(null) + _head = 0 + _tail = 0 + } + + private fun inc(i: Int, modulus: Int): Int { + var x = i + if (++x >= modulus) { + x = 0 + } + return x + } + + /** + * Doubles the capacity of this deque + */ + private fun doubleCapacity() { + assert(_head == _tail) + val p = _head + val n = _elements.size + val r = n - p // number of elements to the right of p + + val newCapacity = n shl 1 + check(newCapacity >= 0) { "Sorry, deque too big" } + + val a = arrayOfNulls<FlowConsumerContextImpl>(newCapacity) + + _elements.copyInto(a, 0, p, r) + _elements.copyInto(a, r, 0, p) + + _elements = a + _head = 0 + _tail = n + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt index 450556f8..55debef0 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt @@ -48,12 +48,12 @@ internal class FlowEngineImpl(private val context: CoroutineContext, clock: Cloc /** * The queue of connection updates that are scheduled for immediate execution. */ - private val queue = ArrayDeque<FlowConsumerContextImpl>() + private val queue = FlowDeque() /** * A priority queue containing the connection updates to be scheduled in the future. */ - private val futureQueue = PriorityQueue<Timer>() + private val futureQueue = FlowTimerQueue() /** * The stack of engine invocations to occur in the future. @@ -63,7 +63,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, clock: Cloc /** * The systems that have been visited during the engine cycle. */ - private val visited: ArrayDeque<FlowConsumerContextImpl> = ArrayDeque() + private val visited = FlowDeque() /** * The index in the batch stack. @@ -151,17 +151,8 @@ internal class FlowEngineImpl(private val context: CoroutineContext, clock: Cloc // Execute all scheduled updates at current timestamp while (true) { - val timer = futureQueue.peek() ?: break - val target = timer.target - - if (target > now) { - break - } - - assert(target >= now) { "Internal inconsistency: found update of the past" } - - futureQueue.poll() - timer.ctx.doUpdate(now, visited, futureQueue, isImmediate = false) + val ctx = futureQueue.poll(now) ?: break + ctx.doUpdate(now, visited, futureQueue, isImmediate = false) } // Repeat execution of all immediate updates until the system has converged to a steady-state @@ -184,9 +175,9 @@ internal class FlowEngineImpl(private val context: CoroutineContext, clock: Cloc } // Schedule an engine invocation for the next update to occur. - val headTimer = futureQueue.peek() - if (headTimer != null) { - trySchedule(now, futureInvocations, headTimer.target) + val headDeadline = futureQueue.peekDeadline() + if (headDeadline != Long.MAX_VALUE) { + trySchedule(now, futureInvocations, headDeadline) } } @@ -224,17 +215,4 @@ internal class FlowEngineImpl(private val context: CoroutineContext, clock: Cloc */ fun cancel() = handle.dispose() } - - /** - * An update call for [ctx] that is scheduled for [target]. - * - * This class represents an update in the future at [target] requested by [ctx]. - */ - class Timer(@JvmField val ctx: FlowConsumerContextImpl, @JvmField val target: Long) : Comparable<Timer> { - override fun compareTo(other: Timer): Int { - return target.compareTo(other.target) - } - - override fun toString(): String = "Timer[ctx=$ctx,timestamp=$target]" - } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt new file mode 100644 index 00000000..22a390e6 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.internal + +/** + * Specialized priority queue for flow timers. + */ +internal class FlowTimerQueue(initialCapacity: Int = 256) { + /** + * The binary heap of deadlines. + */ + private var _deadlines = LongArray(initialCapacity) { Long.MIN_VALUE } + + /** + * The binary heap of [FlowConsumerContextImpl]s. + */ + private var _pending = arrayOfNulls<FlowConsumerContextImpl>(initialCapacity) + + /** + * The number of elements in the priority queue. + */ + private var size = 0 + + /** + * Register a timer for [ctx] with [deadline]. + */ + fun add(ctx: FlowConsumerContextImpl, deadline: Long) { + val i = size + val deadlines = _deadlines + if (i >= deadlines.size) { + grow() + } + + siftUp(deadlines, _pending, i, ctx, deadline) + + size = i + 1 + } + + /** + * Update all pending [FlowConsumerContextImpl]s at the timestamp [now]. + */ + fun poll(now: Long): FlowConsumerContextImpl? { + if (size == 0) { + return null + } + + val deadlines = _deadlines + val deadline = deadlines[0] + + if (now < deadline) { + return null + } + + val pending = _pending + val res = pending[0] + val s = --size + + val nextDeadline = deadlines[s] + val next = pending[s]!! + + // Clear the last element of the queue + pending[s] = null + deadlines[s] = Long.MIN_VALUE + + if (s != 0) { + siftDown(deadlines, pending, next, nextDeadline) + } + + return res + } + + /** + * Find the earliest deadline in the queue. + */ + fun peekDeadline(): Long { + return if (size == 0) Long.MAX_VALUE else _deadlines[0] + } + + /** + * Increases the capacity of the array. + */ + private fun grow() { + val oldCapacity = _deadlines.size + // Double size if small; else grow by 50% + val newCapacity = oldCapacity + if (oldCapacity < 64) oldCapacity + 2 else oldCapacity shr 1 + + _deadlines = _deadlines.copyOf(newCapacity) + _pending = _pending.copyOf(newCapacity) + } + + /** + * Insert item [ctx] at position [pos], maintaining heap invariant by promoting [ctx] up the tree until it is + * greater than or equal to its parent, or is the root. + * + * @param deadlines The heap of deadlines. + * @param pending The heap of contexts. + * @param pos The position to fill. + * @param ctx The [FlowConsumerContextImpl] to insert. + * @param deadline The deadline of the context. + */ + private fun siftUp( + deadlines: LongArray, + pending: Array<FlowConsumerContextImpl?>, + pos: Int, + ctx: FlowConsumerContextImpl, + deadline: Long + ) { + var k = pos + + while (k > 0) { + val parent = (k - 1) ushr 1 + val parentDeadline = deadlines[parent] + + if (deadline >= parentDeadline) { + break + } + + deadlines[k] = parentDeadline + pending[k] = pending[parent] + + k = parent + } + + deadlines[k] = deadline + pending[k] = ctx + } + + /** + * Inserts [ctx] at the top, maintaining heap invariant by demoting [ctx] down the tree repeatedly until it + * is less than or equal to its children or is a leaf. + * + * @param deadlines The heap of deadlines. + * @param pending The heap of contexts. + * @param ctx The [FlowConsumerContextImpl] to insert. + * @param deadline The deadline of the context. + */ + private fun siftDown( + deadlines: LongArray, + pending: Array<FlowConsumerContextImpl?>, + ctx: FlowConsumerContextImpl, + deadline: Long + ) { + var k = 0 + val size = size + val half = size ushr 1 + + while (k < half) { + var child = (k shl 1) + 1 + + var childDeadline = deadlines[child] + val right = child + 1 + + if (right < size) { + val rightDeadline = deadlines[right] + + if (childDeadline > rightDeadline) { + child = right + childDeadline = rightDeadline + } + } + + if (deadline <= childDeadline) { + break + } + + deadlines[k] = childDeadline + pending[k] = pending[child] + + k = child + } + + deadlines[k] = deadline + pending[k] = ctx + } +} |
