summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src/main')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt22
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt116
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt38
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt195
4 files changed, 329 insertions, 42 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
index 9a568897..0baa7880 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
@@ -134,8 +134,8 @@ internal class FlowConsumerContextImpl(
/**
* The timers at which the context is scheduled to be interrupted.
*/
- private var _timer: FlowEngineImpl.Timer? = null
- private val _pendingTimers: ArrayDeque<FlowEngineImpl.Timer> = ArrayDeque(5)
+ private var _timer: Long = Long.MAX_VALUE
+ private val _pendingTimers: ArrayDeque<Long> = ArrayDeque(5)
override fun start() {
check(_flags and ConnState == ConnPending) { "Consumer is already started" }
@@ -217,8 +217,8 @@ internal class FlowConsumerContextImpl(
*/
fun doUpdate(
now: Long,
- visited: ArrayDeque<FlowConsumerContextImpl>,
- timerQueue: PriorityQueue<FlowEngineImpl.Timer>,
+ visited: FlowDeque,
+ timerQueue: FlowTimerQueue,
isImmediate: Boolean
) {
var flags = _flags
@@ -326,8 +326,7 @@ internal class FlowConsumerContextImpl(
// Prune the head timer if this is a delayed update
val timer = if (!isImmediate) {
// Invariant: Any pending timer should only point to a future timestamp
- // See also `scheduleDelayed`
- val timer = pendingTimers.poll()
+ val timer = pendingTimers.poll() ?: Long.MAX_VALUE
_timer = timer
timer
} else {
@@ -342,7 +341,7 @@ internal class FlowConsumerContextImpl(
if (newDeadline == Long.MAX_VALUE ||
flags and ConnState != ConnActive ||
flags and ConnDisableTimers != 0 ||
- (timer != null && newDeadline >= timer.target)
+ (timer != Long.MAX_VALUE && newDeadline >= timer)
) {
// Ignore any deadline scheduled at the maximum value
// This indicates that the source does not want to register a timer
@@ -350,12 +349,11 @@ internal class FlowConsumerContextImpl(
}
// Construct a timer for the new deadline and add it to the global queue of timers
- val newTimer = FlowEngineImpl.Timer(this, newDeadline)
- _timer = newTimer
- timerQueue.add(newTimer)
+ _timer = newDeadline
+ timerQueue.add(this, newDeadline)
- // A timer already exists for this connection, so add it to the queue of pending timers
- if (timer != null) {
+ // Slow-path: a timer already exists for this connection, so add it to the queue of pending timers
+ if (timer != Long.MAX_VALUE) {
pendingTimers.addFirst(timer)
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt
new file mode 100644
index 00000000..c6cba4b7
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt
@@ -0,0 +1,116 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.internal
+
+import java.util.*
+
+/**
+ * A specialized [ArrayDeque] for [FlowConsumerContextImpl] implementations.
+ */
+internal class FlowDeque(initialCapacity: Int = 256) {
+ /**
+ * The array of elements in the queue.
+ */
+ private var _elements: Array<FlowConsumerContextImpl?> = arrayOfNulls(initialCapacity)
+ private var _head = 0
+ private var _tail = 0
+
+ /**
+ * Determine whether this queue is not empty.
+ */
+ fun isNotEmpty(): Boolean {
+ return _head != _tail
+ }
+
+ /**
+ * Add the specified [ctx] to the queue.
+ */
+ fun add(ctx: FlowConsumerContextImpl) {
+ val es = _elements
+ var tail = _tail
+
+ es[tail] = ctx
+
+ tail = inc(tail, es.size)
+ _tail = tail
+
+ if (_head == tail) {
+ doubleCapacity()
+ }
+ }
+
+ /**
+ * Remove a [FlowConsumerContextImpl] from the queue or `null` if the queue is empty.
+ */
+ fun poll(): FlowConsumerContextImpl? {
+ val es = _elements
+ val head = _head
+ val ctx = es[head]
+
+ if (ctx != null) {
+ es[head] = null
+ _head = inc(head, es.size)
+ }
+
+ return ctx
+ }
+
+ /**
+ * Clear the queue.
+ */
+ fun clear() {
+ _elements.fill(null)
+ _head = 0
+ _tail = 0
+ }
+
+ private fun inc(i: Int, modulus: Int): Int {
+ var x = i
+ if (++x >= modulus) {
+ x = 0
+ }
+ return x
+ }
+
+ /**
+ * Doubles the capacity of this deque
+ */
+ private fun doubleCapacity() {
+ assert(_head == _tail)
+ val p = _head
+ val n = _elements.size
+ val r = n - p // number of elements to the right of p
+
+ val newCapacity = n shl 1
+ check(newCapacity >= 0) { "Sorry, deque too big" }
+
+ val a = arrayOfNulls<FlowConsumerContextImpl>(newCapacity)
+
+ _elements.copyInto(a, 0, p, r)
+ _elements.copyInto(a, r, 0, p)
+
+ _elements = a
+ _head = 0
+ _tail = n
+ }
+}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
index 450556f8..55debef0 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
@@ -48,12 +48,12 @@ internal class FlowEngineImpl(private val context: CoroutineContext, clock: Cloc
/**
* The queue of connection updates that are scheduled for immediate execution.
*/
- private val queue = ArrayDeque<FlowConsumerContextImpl>()
+ private val queue = FlowDeque()
/**
* A priority queue containing the connection updates to be scheduled in the future.
*/
- private val futureQueue = PriorityQueue<Timer>()
+ private val futureQueue = FlowTimerQueue()
/**
* The stack of engine invocations to occur in the future.
@@ -63,7 +63,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, clock: Cloc
/**
* The systems that have been visited during the engine cycle.
*/
- private val visited: ArrayDeque<FlowConsumerContextImpl> = ArrayDeque()
+ private val visited = FlowDeque()
/**
* The index in the batch stack.
@@ -151,17 +151,8 @@ internal class FlowEngineImpl(private val context: CoroutineContext, clock: Cloc
// Execute all scheduled updates at current timestamp
while (true) {
- val timer = futureQueue.peek() ?: break
- val target = timer.target
-
- if (target > now) {
- break
- }
-
- assert(target >= now) { "Internal inconsistency: found update of the past" }
-
- futureQueue.poll()
- timer.ctx.doUpdate(now, visited, futureQueue, isImmediate = false)
+ val ctx = futureQueue.poll(now) ?: break
+ ctx.doUpdate(now, visited, futureQueue, isImmediate = false)
}
// Repeat execution of all immediate updates until the system has converged to a steady-state
@@ -184,9 +175,9 @@ internal class FlowEngineImpl(private val context: CoroutineContext, clock: Cloc
}
// Schedule an engine invocation for the next update to occur.
- val headTimer = futureQueue.peek()
- if (headTimer != null) {
- trySchedule(now, futureInvocations, headTimer.target)
+ val headDeadline = futureQueue.peekDeadline()
+ if (headDeadline != Long.MAX_VALUE) {
+ trySchedule(now, futureInvocations, headDeadline)
}
}
@@ -224,17 +215,4 @@ internal class FlowEngineImpl(private val context: CoroutineContext, clock: Cloc
*/
fun cancel() = handle.dispose()
}
-
- /**
- * An update call for [ctx] that is scheduled for [target].
- *
- * This class represents an update in the future at [target] requested by [ctx].
- */
- class Timer(@JvmField val ctx: FlowConsumerContextImpl, @JvmField val target: Long) : Comparable<Timer> {
- override fun compareTo(other: Timer): Int {
- return target.compareTo(other.target)
- }
-
- override fun toString(): String = "Timer[ctx=$ctx,timestamp=$target]"
- }
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt
new file mode 100644
index 00000000..22a390e6
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt
@@ -0,0 +1,195 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.internal
+
+/**
+ * Specialized priority queue for flow timers.
+ */
+internal class FlowTimerQueue(initialCapacity: Int = 256) {
+ /**
+ * The binary heap of deadlines.
+ */
+ private var _deadlines = LongArray(initialCapacity) { Long.MIN_VALUE }
+
+ /**
+ * The binary heap of [FlowConsumerContextImpl]s.
+ */
+ private var _pending = arrayOfNulls<FlowConsumerContextImpl>(initialCapacity)
+
+ /**
+ * The number of elements in the priority queue.
+ */
+ private var size = 0
+
+ /**
+ * Register a timer for [ctx] with [deadline].
+ */
+ fun add(ctx: FlowConsumerContextImpl, deadline: Long) {
+ val i = size
+ val deadlines = _deadlines
+ if (i >= deadlines.size) {
+ grow()
+ }
+
+ siftUp(deadlines, _pending, i, ctx, deadline)
+
+ size = i + 1
+ }
+
+ /**
+ * Update all pending [FlowConsumerContextImpl]s at the timestamp [now].
+ */
+ fun poll(now: Long): FlowConsumerContextImpl? {
+ if (size == 0) {
+ return null
+ }
+
+ val deadlines = _deadlines
+ val deadline = deadlines[0]
+
+ if (now < deadline) {
+ return null
+ }
+
+ val pending = _pending
+ val res = pending[0]
+ val s = --size
+
+ val nextDeadline = deadlines[s]
+ val next = pending[s]!!
+
+ // Clear the last element of the queue
+ pending[s] = null
+ deadlines[s] = Long.MIN_VALUE
+
+ if (s != 0) {
+ siftDown(deadlines, pending, next, nextDeadline)
+ }
+
+ return res
+ }
+
+ /**
+ * Find the earliest deadline in the queue.
+ */
+ fun peekDeadline(): Long {
+ return if (size == 0) Long.MAX_VALUE else _deadlines[0]
+ }
+
+ /**
+ * Increases the capacity of the array.
+ */
+ private fun grow() {
+ val oldCapacity = _deadlines.size
+ // Double size if small; else grow by 50%
+ val newCapacity = oldCapacity + if (oldCapacity < 64) oldCapacity + 2 else oldCapacity shr 1
+
+ _deadlines = _deadlines.copyOf(newCapacity)
+ _pending = _pending.copyOf(newCapacity)
+ }
+
+ /**
+ * Insert item [ctx] at position [pos], maintaining heap invariant by promoting [ctx] up the tree until it is
+ * greater than or equal to its parent, or is the root.
+ *
+ * @param deadlines The heap of deadlines.
+ * @param pending The heap of contexts.
+ * @param pos The position to fill.
+ * @param ctx The [FlowConsumerContextImpl] to insert.
+ * @param deadline The deadline of the context.
+ */
+ private fun siftUp(
+ deadlines: LongArray,
+ pending: Array<FlowConsumerContextImpl?>,
+ pos: Int,
+ ctx: FlowConsumerContextImpl,
+ deadline: Long
+ ) {
+ var k = pos
+
+ while (k > 0) {
+ val parent = (k - 1) ushr 1
+ val parentDeadline = deadlines[parent]
+
+ if (deadline >= parentDeadline) {
+ break
+ }
+
+ deadlines[k] = parentDeadline
+ pending[k] = pending[parent]
+
+ k = parent
+ }
+
+ deadlines[k] = deadline
+ pending[k] = ctx
+ }
+
+ /**
+ * Inserts [ctx] at the top, maintaining heap invariant by demoting [ctx] down the tree repeatedly until it
+ * is less than or equal to its children or is a leaf.
+ *
+ * @param deadlines The heap of deadlines.
+ * @param pending The heap of contexts.
+ * @param ctx The [FlowConsumerContextImpl] to insert.
+ * @param deadline The deadline of the context.
+ */
+ private fun siftDown(
+ deadlines: LongArray,
+ pending: Array<FlowConsumerContextImpl?>,
+ ctx: FlowConsumerContextImpl,
+ deadline: Long
+ ) {
+ var k = 0
+ val size = size
+ val half = size ushr 1
+
+ while (k < half) {
+ var child = (k shl 1) + 1
+
+ var childDeadline = deadlines[child]
+ val right = child + 1
+
+ if (right < size) {
+ val rightDeadline = deadlines[right]
+
+ if (childDeadline > rightDeadline) {
+ child = right
+ childDeadline = rightDeadline
+ }
+ }
+
+ if (deadline <= childDeadline) {
+ break
+ }
+
+ deadlines[k] = childDeadline
+ pending[k] = pending[child]
+
+ k = child
+ }
+
+ deadlines[k] = deadline
+ pending[k] = ctx
+ }
+}