summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-04 16:43:02 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-05 15:03:02 +0200
commit5c4f9d936d7c08e8ad2705ed3dde5ea8dcd2ee64 (patch)
tree41886e635916dd29f4ef2b8a58e9cd984c337c48 /opendc-simulator/opendc-simulator-flow/src
parentb92d0e8703014f143ff0b1fe67de09fff6f867b1 (diff)
perf(simulator): Do not prune invocations on sync engine invocation
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt112
1 files changed, 49 insertions, 63 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
index 019b5f10..a9234abf 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
@@ -38,7 +38,7 @@ import kotlin.coroutines.CoroutineContext
* @param context The coroutine context to use.
* @param clock The virtual simulation clock.
*/
-internal class FlowEngineImpl(private val context: CoroutineContext, override val clock: Clock) : FlowEngine {
+internal class FlowEngineImpl(private val context: CoroutineContext, override val clock: Clock) : FlowEngine, Runnable {
/**
* The [Delay] instance that provides scheduled execution of [Runnable]s.
*/
@@ -82,7 +82,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
return
}
- runEngine(now)
+ doRunEngine(now)
}
/**
@@ -100,7 +100,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
return
}
- runEngine(now)
+ doRunEngine(now)
}
override fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext = FlowConsumerContextImpl(this, consumer, provider)
@@ -120,16 +120,13 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
}
}
- /**
- * Run the engine and mark as active while running.
- */
- private fun runEngine(now: Long) {
- try {
- batchIndex++
- doRunEngine(now)
- } finally {
- batchIndex--
- }
+ /* Runnable */
+ override fun run() {
+ val now = clock.millis()
+ val invocation = futureInvocations.poll() // Clear invocation from future invocation queue
+ assert(now >= invocation.timestamp) { "Future invocations invariant violated" }
+
+ doRunEngine(now)
}
/**
@@ -141,44 +138,43 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
val futureInvocations = futureInvocations
val visited = visited
- // Remove any entries in the `futureInvocations` queue from the past
- while (true) {
- val head = futureInvocations.peek()
- if (head == null || head.timestamp > now) {
- break
- }
- futureInvocations.poll()
- }
-
- // Execute all scheduled updates at current timestamp
- while (true) {
- val timer = futureQueue.peek() ?: break
- val target = timer.target
+ try {
+ // Increment batch index so synchronous calls will not launch concurrent engine invocations
+ batchIndex++
- if (target > now) {
- break
- }
+ // Execute all scheduled updates at current timestamp
+ while (true) {
+ val timer = futureQueue.peek() ?: break
+ val target = timer.target
- assert(target >= now) { "Internal inconsistency: found update of the past" }
+ if (target > now) {
+ break
+ }
- futureQueue.poll()
- timer.ctx.doUpdate(now, visited, futureQueue, isImmediate = false)
- }
+ assert(target >= now) { "Internal inconsistency: found update of the past" }
- // Repeat execution of all immediate updates until the system has converged to a steady-state
- // We have to take into account that the onConverge callback can also trigger new actions.
- do {
- // Execute all immediate updates
- while (true) {
- val ctx = queue.poll() ?: break
- ctx.doUpdate(now, visited, futureQueue, isImmediate = true)
+ futureQueue.poll()
+ timer.ctx.doUpdate(now, visited, futureQueue, isImmediate = false)
}
- while (true) {
- val ctx = visited.poll() ?: break
- ctx.onConverge(now)
- }
- } while (queue.isNotEmpty())
+ // Repeat execution of all immediate updates until the system has converged to a steady-state
+ // We have to take into account that the onConverge callback can also trigger new actions.
+ do {
+ // Execute all immediate updates
+ while (true) {
+ val ctx = queue.poll() ?: break
+ ctx.doUpdate(now, visited, futureQueue, isImmediate = true)
+ }
+
+ while (true) {
+ val ctx = visited.poll() ?: break
+ ctx.onConverge(now)
+ }
+ } while (queue.isNotEmpty())
+ } finally {
+ // Decrement batch index to indicate no engine is active at the moment
+ batchIndex--
+ }
// Schedule an engine invocation for the next update to occur.
val headTimer = futureQueue.peek()
@@ -195,24 +191,14 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
* @param scheduled The queue of scheduled invocations.
*/
private fun trySchedule(now: Long, scheduled: ArrayDeque<Invocation>, target: Long) {
- while (true) {
- val invocation = scheduled.peekFirst()
- if (invocation == null || invocation.timestamp > target) {
- // Case 2: A new timer was registered ahead of the other timers.
- // Solution: Schedule a new scheduler invocation
- @OptIn(InternalCoroutinesApi::class)
- val handle = delay.invokeOnTimeout(target - now, { runEngine(target) }, context)
- scheduled.addFirst(Invocation(target, handle))
- break
- } else if (invocation.timestamp < target) {
- // Case 2: A timer was cancelled and the head of the timer queue is now later than excepted
- // Solution: Cancel the next scheduler invocation
- scheduled.pollFirst()
-
- invocation.cancel()
- } else {
- break
- }
+ val head = scheduled.peek()
+
+ // Only schedule a new scheduler invocation in case the target is earlier than all other pending
+ // scheduler invocations
+ if (head == null || target < head.timestamp) {
+ @OptIn(InternalCoroutinesApi::class)
+ val handle = delay.invokeOnTimeout(target - now, this, context)
+ scheduled.addFirst(Invocation(target, handle))
}
}