diff options
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src')
2 files changed, 23 insertions, 14 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index a4d82a3d..fc9c8059 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -109,7 +109,8 @@ internal class FlowConsumerContextImpl( /** * The timers at which the context is scheduled to be interrupted. */ - private val _timers: ArrayDeque<FlowEngineImpl.Timer> = ArrayDeque() + private var _timer: FlowEngineImpl.Timer? = null + private val _pendingTimers: ArrayDeque<FlowEngineImpl.Timer> = ArrayDeque(5) override fun start() { check(_state == State.Pending) { "Consumer is already started" } @@ -256,15 +257,10 @@ internal class FlowConsumerContextImpl( /** * Prune the elapsed timers from this context. */ - fun pruneTimers(now: Long) { - val timers = _timers - while (true) { - val head = timers.peek() - if (head == null || head.target > now) { - break - } - timers.poll() - } + fun updateTimers() { + // Invariant: Any pending timer should only point to a future timestamp + // See also `scheduleDelayed` + _timer = _pendingTimers.poll() } /** @@ -363,9 +359,21 @@ internal class FlowConsumerContextImpl( * Schedule a delayed update for this resource context. */ private fun scheduleDelayed(now: Long, target: Long) { - val timers = _timers - if (target != Long.MAX_VALUE && (timers.isEmpty() || target < timers.peek().target)) { - timers.addFirst(engine.scheduleDelayed(now, this, target)) + // Ignore any target scheduled at the maximum value + // This indicates that the sources does not want to register a timer + if (target == Long.MAX_VALUE) { + return + } + + val timer = _timer + + if (timer == null) { + // No existing timer exists, so schedule a new timer and update the head + _timer = engine.scheduleDelayed(now, this, target) + } else if (target < timer.target) { + // Existing timer is further in the future, so schedule a new timer ahead of it + _timer = engine.scheduleDelayed(now, this, target) + _pendingTimers.addFirst(timer) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt index 5f15fbed..c8170a43 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt @@ -190,7 +190,8 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va futureQueue.poll() - ctx.pruneTimers(now) + // Update the existing timers of the connection + ctx.updateTimers() if (ctx.shouldUpdate(now)) { if (!ctx.doUpdate(now)) { |
