summaryrefslogtreecommitdiff
path: root/opendc-simulator
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt34
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt3
2 files changed, 23 insertions, 14 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
index a4d82a3d..fc9c8059 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
@@ -109,7 +109,8 @@ internal class FlowConsumerContextImpl(
/**
* The timers at which the context is scheduled to be interrupted.
*/
- private val _timers: ArrayDeque<FlowEngineImpl.Timer> = ArrayDeque()
+ private var _timer: FlowEngineImpl.Timer? = null
+ private val _pendingTimers: ArrayDeque<FlowEngineImpl.Timer> = ArrayDeque(5)
override fun start() {
check(_state == State.Pending) { "Consumer is already started" }
@@ -256,15 +257,10 @@ internal class FlowConsumerContextImpl(
/**
* Prune the elapsed timers from this context.
*/
- fun pruneTimers(now: Long) {
- val timers = _timers
- while (true) {
- val head = timers.peek()
- if (head == null || head.target > now) {
- break
- }
- timers.poll()
- }
+ fun updateTimers() {
+ // Invariant: Any pending timer should only point to a future timestamp
+ // See also `scheduleDelayed`
+ _timer = _pendingTimers.poll()
}
/**
@@ -363,9 +359,21 @@ internal class FlowConsumerContextImpl(
* Schedule a delayed update for this resource context.
*/
private fun scheduleDelayed(now: Long, target: Long) {
- val timers = _timers
- if (target != Long.MAX_VALUE && (timers.isEmpty() || target < timers.peek().target)) {
- timers.addFirst(engine.scheduleDelayed(now, this, target))
+ // Ignore any target scheduled at the maximum value
+ // This indicates that the sources does not want to register a timer
+ if (target == Long.MAX_VALUE) {
+ return
+ }
+
+ val timer = _timer
+
+ if (timer == null) {
+ // No existing timer exists, so schedule a new timer and update the head
+ _timer = engine.scheduleDelayed(now, this, target)
+ } else if (target < timer.target) {
+ // Existing timer is further in the future, so schedule a new timer ahead of it
+ _timer = engine.scheduleDelayed(now, this, target)
+ _pendingTimers.addFirst(timer)
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
index 5f15fbed..c8170a43 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
@@ -190,7 +190,8 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
futureQueue.poll()
- ctx.pruneTimers(now)
+ // Update the existing timers of the connection
+ ctx.updateTimers()
if (ctx.shouldUpdate(now)) {
if (!ctx.doUpdate(now)) {