diff options
4 files changed, 127 insertions, 149 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index ffc50ad8..04413db5 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -212,7 +212,7 @@ class CapelinIntegrationTest { { assertEquals(6013899, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, { assertEquals(14724501, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, { assertEquals(12027839, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, - { assertEquals(477664, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } + { assertEquals(475891, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } ) } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt index d7ea0043..9cbf849d 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceContextImpl.kt @@ -24,6 +24,7 @@ package org.opendc.simulator.resources.impl import org.opendc.simulator.resources.* import java.time.Clock +import java.util.ArrayDeque import kotlin.math.max import kotlin.math.min @@ -96,9 +97,9 @@ internal class SimResourceContextImpl( private var _flag: Int = 0 /** - * The current pending update. + * The timers at which the context is scheduled to be interrupted. */ - private var _pendingUpdate: SimResourceInterpreterImpl.Update? = null + private val _timers: ArrayDeque<SimResourceInterpreterImpl.Timer> = ArrayDeque() override fun start() { check(_state == SimResourceState.Pending) { "Consumer is already started" } @@ -126,7 +127,7 @@ internal class SimResourceContextImpl( } _flag = _flag or FLAG_INTERRUPT - scheduleUpdate() + scheduleUpdate(clock.millis()) } override fun invalidate() { @@ -135,7 +136,7 @@ internal class SimResourceContextImpl( } _flag = _flag or FLAG_INVALIDATE - scheduleUpdate() + scheduleUpdate(clock.millis()) } override fun flush() { @@ -143,7 +144,7 @@ internal class SimResourceContextImpl( return } - interpreter.scheduleSync(this) + interpreter.scheduleSync(clock.millis(), this) } override fun push(rate: Double) { @@ -154,9 +155,9 @@ internal class SimResourceContextImpl( /** * Determine whether the state of the resource context should be updated. */ - fun requiresUpdate(timestamp: Long): Boolean { + fun shouldUpdate(timestamp: Long): Boolean { // Either the resource context is flagged or there is a pending update at this timestamp - return _flag != 0 || _pendingUpdate?.timestamp == timestamp + return _flag != 0 || _deadline == timestamp } /** @@ -204,7 +205,7 @@ internal class SimResourceContextImpl( _deadline = target - scheduleUpdate(target) + scheduleUpdate(timestamp, target) } SimResourceState.Pending -> if (oldState != SimResourceState.Pending) { @@ -229,6 +230,30 @@ internal class SimResourceContextImpl( } } + /** + * Prune the elapsed timers from this context. + */ + fun pruneTimers(now: Long) { + val timers = _timers + while (true) { + val head = timers.peek() + if (head == null || head.target > now) { + break + } + timers.poll() + } + } + + /** + * Try to re-schedule the resource context in case it was skipped. + */ + fun tryReschedule(now: Long) { + val deadline = _deadline + if (deadline > now && deadline != Long.MAX_VALUE) { + scheduleUpdate(now, deadline) + } + } + override fun toString(): String = "SimResourceContextImpl[capacity=$capacity]" /** @@ -279,35 +304,17 @@ internal class SimResourceContextImpl( /** * Schedule an update for this resource context. */ - private fun scheduleUpdate() { - // Cancel the pending update - val pendingUpdate = _pendingUpdate - if (pendingUpdate != null) { - _pendingUpdate = null - pendingUpdate.cancel() - } - - interpreter.scheduleImmediate(this) + private fun scheduleUpdate(now: Long) { + interpreter.scheduleImmediate(now, this) } /** * Schedule a delayed update for this resource context. */ - private fun scheduleUpdate(timestamp: Long) { - val pendingUpdate = _pendingUpdate - if (pendingUpdate != null) { - if (pendingUpdate.timestamp == timestamp) { - // Fast-path: A pending update for the same timestamp already exists - return - } else { - // Cancel the old pending update - _pendingUpdate = null - pendingUpdate.cancel() - } - } - - if (timestamp != Long.MAX_VALUE) { - _pendingUpdate = interpreter.scheduleDelayed(this, timestamp) + private fun scheduleUpdate(now: Long, target: Long) { + val timers = _timers + if (target != Long.MAX_VALUE && (timers.isEmpty() || target < timers.peek().target)) { + timers.addFirst(interpreter.scheduleDelayed(now, this, target)) } } diff --git a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt index c3dcebd0..2b6ec2ba 100644 --- a/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt +++ b/opendc-simulator/opendc-simulator-resources/src/main/kotlin/org/opendc/simulator/resources/impl/SimResourceInterpreterImpl.kt @@ -53,7 +53,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, /** * A priority queue containing the resource updates to be scheduled in the future. */ - private val futureQueue = PriorityQueue<Update>(compareBy { it.timestamp }) + private val futureQueue = PriorityQueue<Timer>() /** * The stack of interpreter invocations to occur in the future. @@ -77,13 +77,14 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, get() = batchIndex > 0 /** - * Enqueue the specified [ctx] to be updated immediately during the active interpreter cycle. - * - * This method should be used when the state of a resource context is invalidated/interrupted and needs to be - * re-computed. In case no interpreter is currently active, the interpreter will be started. + * Update the specified [ctx] synchronously. */ - fun scheduleImmediate(ctx: SimResourceContextImpl) { - queue.add(ctx) + fun scheduleSync(now: Long, ctx: SimResourceContextImpl) { + ctx.doUpdate(now) + + if (visited.add(ctx)) { + collectAncestors(ctx, visited) + } // In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked // up by the active interpreter. @@ -93,21 +94,20 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, try { batchIndex++ - runInterpreter() + runInterpreter(now) } finally { batchIndex-- } } /** - * Update the specified [ctx] synchronously. + * Enqueue the specified [ctx] to be updated immediately during the active interpreter cycle. + * + * This method should be used when the state of a resource context is invalidated/interrupted and needs to be + * re-computed. In case no interpreter is currently active, the interpreter will be started. */ - fun scheduleSync(ctx: SimResourceContextImpl) { - ctx.doUpdate(clock.millis()) - - if (visited.add(ctx)) { - collectAncestors(ctx, visited) - } + fun scheduleImmediate(now: Long, ctx: SimResourceContextImpl) { + queue.add(ctx) // In-case the interpreter is already running in the call-stack, return immediately. The changes will be picked // up by the active interpreter. @@ -117,35 +117,30 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, try { batchIndex++ - runInterpreter() + runInterpreter(now) } finally { batchIndex-- } } /** - * Schedule the interpreter to run at [timestamp] to update the resource contexts. + * Schedule the interpreter to run at [target] to update the resource contexts. * * This method will override earlier calls to this method for the same [ctx]. * + * @param now The current virtual timestamp. * @param ctx The resource context to which the event applies. - * @param timestamp The timestamp when the interrupt should happen. + * @param target The timestamp when the interrupt should happen. */ - fun scheduleDelayed(ctx: SimResourceContextImpl, timestamp: Long): Update { - val now = clock.millis() + fun scheduleDelayed(now: Long, ctx: SimResourceContextImpl, target: Long): Timer { val futureQueue = futureQueue - require(timestamp >= now) { "Timestamp must be in the future" } + require(target >= now) { "Timestamp must be in the future" } - val update = allocUpdate(ctx, timestamp) - futureQueue.add(update) + val timer = Timer(ctx, target) + futureQueue.add(timer) - // Optimization: Check if we need to push the interruption forward. Note that we check by timer reference. - if (futureQueue.peek() === update) { - trySchedule(futureQueue, futureInvocations) - } - - return update + return timer } override fun newContext( @@ -162,7 +157,7 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, try { // Flush the work if the platform is not already running if (batchIndex == 1 && queue.isNotEmpty()) { - runInterpreter() + runInterpreter(clock.millis()) } } finally { batchIndex-- @@ -172,37 +167,46 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, /** * Interpret all actions that are scheduled for the current timestamp. */ - private fun runInterpreter() { - val now = clock.millis() + private fun runInterpreter(now: Long) { val queue = queue val futureQueue = futureQueue val futureInvocations = futureInvocations val visited = visited + // Remove any entries in the `futureInvocations` queue from the past + while (true) { + val head = futureInvocations.peek() + if (head == null || head.timestamp > now) { + break + } + futureInvocations.poll() + } + // Execute all scheduled updates at current timestamp while (true) { - val update = futureQueue.peek() ?: break + val timer = futureQueue.peek() ?: break + val ctx = timer.ctx + val target = timer.target - assert(update.timestamp >= now) { "Internal inconsistency: found update of the past" } + assert(target >= now) { "Internal inconsistency: found update of the past" } - if (update.timestamp > now && !update.isCancelled) { - // Schedule a task for the next event to occur. - trySchedule(futureQueue, futureInvocations) + if (target > now) { break } futureQueue.poll() - val shouldExecute = !update.isCancelled && update.ctx.requiresUpdate(now) - if (shouldExecute) { - update.ctx.doUpdate(now) + ctx.pruneTimers(now) - if (visited.add(update.ctx)) { - collectAncestors(update.ctx, visited) + if (ctx.shouldUpdate(now)) { + ctx.doUpdate(now) + + if (visited.add(ctx)) { + collectAncestors(ctx, visited) } + } else { + ctx.tryReschedule(now) } - - updatePool.add(update) } // Repeat execution of all immediate updates until the system has converged to a steady-state @@ -211,9 +215,8 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, // Execute all immediate updates while (true) { val ctx = queue.poll() ?: break - val shouldExecute = ctx.requiresUpdate(now) - if (shouldExecute) { + if (ctx.shouldUpdate(now)) { ctx.doUpdate(now) if (visited.add(ctx)) { @@ -228,51 +231,59 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, visited.clear() } while (queue.isNotEmpty()) + + // Schedule an interpreter invocation for the next update to occur. + val headTimer = futureQueue.peek() + if (headTimer != null) { + trySchedule(now, futureInvocations, headTimer.target) + } } /** - * Try to schedule the next interpreter event. + * Collect all the ancestors of the specified [system]. */ - private fun trySchedule(queue: PriorityQueue<Update>, scheduled: ArrayDeque<Invocation>) { - val nextTimer = queue.peek() - val now = clock.millis() - - // Check whether we need to update our schedule: - if (nextTimer == null) { - // Case 1: all timers are cancelled - for (invocation in scheduled) { - invocation.cancel() - } - scheduled.clear() - return + private tailrec fun collectAncestors(system: SimResourceSystem, systems: MutableSet<SimResourceSystem>) { + val parent = system.parent + if (parent != null) { + systems.add(parent) + collectAncestors(parent, systems) } + } + /** + * Try to schedule an interpreter invocation at the specified [target]. + * + * @param now The current virtual timestamp. + * @param target The virtual timestamp at which the interpreter invocation should happen. + * @param scheduled The queue of scheduled invocations. + */ + private fun trySchedule(now: Long, scheduled: ArrayDeque<Invocation>, target: Long) { while (true) { val invocation = scheduled.peekFirst() - if (invocation == null || invocation.timestamp > nextTimer.timestamp) { + if (invocation == null || invocation.timestamp > target) { // Case 2: A new timer was registered ahead of the other timers. // Solution: Schedule a new scheduler invocation - val nextTimestamp = nextTimer.timestamp @OptIn(InternalCoroutinesApi::class) val handle = delay.invokeOnTimeout( - nextTimestamp - now, + target - now, { try { batchIndex++ - runInterpreter() + runInterpreter(target) } finally { batchIndex-- } }, context ) - scheduled.addFirst(Invocation(nextTimestamp, handle)) + scheduled.addFirst(Invocation(target, handle)) break - } else if (invocation.timestamp < nextTimer.timestamp) { + } else if (invocation.timestamp < target) { // Case 2: A timer was cancelled and the head of the timer queue is now later than excepted // Solution: Cancel the next scheduler invocation - invocation.cancel() scheduled.pollFirst() + + invocation.cancel() } else { break } @@ -280,37 +291,6 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, } /** - * Collect all the ancestors of the specified [system]. - */ - private tailrec fun collectAncestors(system: SimResourceSystem, systems: MutableSet<SimResourceSystem>) { - val parent = system.parent - if (parent != null) { - systems.add(parent) - collectAncestors(parent, systems) - } - } - - /** - * The pool of existing updates. - */ - private val updatePool = ArrayDeque<Update>() - - /** - * Allocate an [Update] object. - */ - private fun allocUpdate(ctx: SimResourceContextImpl, timestamp: Long): Update { - val update = updatePool.poll() - return if (update != null) { - update.ctx = ctx - update.timestamp = timestamp - update.isCancelled = false - update - } else { - Update(ctx, timestamp) - } - } - - /** * A future interpreter invocation. * * This class is used to keep track of the future scheduler invocations created using the [Delay] instance. In case @@ -318,34 +298,25 @@ internal class SimResourceInterpreterImpl(private val context: CoroutineContext, */ private data class Invocation( @JvmField val timestamp: Long, - private val disposableHandle: DisposableHandle + @JvmField val handle: DisposableHandle ) { /** * Cancel the interpreter invocation. */ - fun cancel() = disposableHandle.dispose() + fun cancel() = handle.dispose() } /** - * An update call for [ctx] that is scheduled for [timestamp]. + * An update call for [ctx] that is scheduled for [target]. * - * This class represents an update in the future at [timestamp] requested by [ctx]. A deferred update might be + * This class represents an update in the future at [target] requested by [ctx]. A deferred update might be * cancelled if the resource context was invalidated in the meantime. */ - class Update(@JvmField var ctx: SimResourceContextImpl, @JvmField var timestamp: Long) { - /** - * A flag to indicate that the task has been cancelled. - */ - @JvmField - var isCancelled: Boolean = false - - /** - * Cancel the update. - */ - fun cancel() { - isCancelled = true + class Timer(@JvmField val ctx: SimResourceContextImpl, @JvmField val target: Long) : Comparable<Timer> { + override fun compareTo(other: Timer): Int { + return target.compareTo(other.target) } - override fun toString(): String = "Update[ctx=$ctx,timestamp=$timestamp]" + override fun toString(): String = "Timer[ctx=$ctx,timestamp=$target]" } } diff --git a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt index 4e57f598..e95e9e42 100644 --- a/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt +++ b/opendc-simulator/opendc-simulator-resources/src/test/kotlin/org/opendc/simulator/resources/SimResourceContextTest.kt @@ -51,7 +51,7 @@ class SimResourceContextTest { val logic = object : SimResourceProviderLogic {} val context = SimResourceContextImpl(null, interpreter, consumer, logic) - context.doUpdate(interpreter.clock.millis()) + interpreter.scheduleSync(interpreter.clock.millis(), context) } @Test @@ -77,7 +77,7 @@ class SimResourceContextTest { context.start() delay(1) // Delay 1 ms to prevent hitting the fast path - context.doUpdate(interpreter.clock.millis()) + interpreter.scheduleSync(interpreter.clock.millis(), context) verify(exactly = 2) { logic.onConsume(any(), any(), any(), any()) } } |
