diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-04 16:43:02 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-05 15:03:02 +0200 |
| commit | 5c4f9d936d7c08e8ad2705ed3dde5ea8dcd2ee64 (patch) | |
| tree | 41886e635916dd29f4ef2b8a58e9cd984c337c48 /opendc-simulator/opendc-simulator-flow/src/main | |
| parent | b92d0e8703014f143ff0b1fe67de09fff6f867b1 (diff) | |
perf(simulator): Do not prune invocations on sync engine invocation
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src/main')
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt | 112 |
1 files changed, 49 insertions, 63 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt index 019b5f10..a9234abf 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt @@ -38,7 +38,7 @@ import kotlin.coroutines.CoroutineContext * @param context The coroutine context to use. * @param clock The virtual simulation clock. */ -internal class FlowEngineImpl(private val context: CoroutineContext, override val clock: Clock) : FlowEngine { +internal class FlowEngineImpl(private val context: CoroutineContext, override val clock: Clock) : FlowEngine, Runnable { /** * The [Delay] instance that provides scheduled execution of [Runnable]s. */ @@ -82,7 +82,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va return } - runEngine(now) + doRunEngine(now) } /** @@ -100,7 +100,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va return } - runEngine(now) + doRunEngine(now) } override fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext = FlowConsumerContextImpl(this, consumer, provider) @@ -120,16 +120,13 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va } } - /** - * Run the engine and mark as active while running. - */ - private fun runEngine(now: Long) { - try { - batchIndex++ - doRunEngine(now) - } finally { - batchIndex-- - } + /* Runnable */ + override fun run() { + val now = clock.millis() + val invocation = futureInvocations.poll() // Clear invocation from future invocation queue + assert(now >= invocation.timestamp) { "Future invocations invariant violated" } + + doRunEngine(now) } /** @@ -141,44 +138,43 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va val futureInvocations = futureInvocations val visited = visited - // Remove any entries in the `futureInvocations` queue from the past - while (true) { - val head = futureInvocations.peek() - if (head == null || head.timestamp > now) { - break - } - futureInvocations.poll() - } - - // Execute all scheduled updates at current timestamp - while (true) { - val timer = futureQueue.peek() ?: break - val target = timer.target + try { + // Increment batch index so synchronous calls will not launch concurrent engine invocations + batchIndex++ - if (target > now) { - break - } + // Execute all scheduled updates at current timestamp + while (true) { + val timer = futureQueue.peek() ?: break + val target = timer.target - assert(target >= now) { "Internal inconsistency: found update of the past" } + if (target > now) { + break + } - futureQueue.poll() - timer.ctx.doUpdate(now, visited, futureQueue, isImmediate = false) - } + assert(target >= now) { "Internal inconsistency: found update of the past" } - // Repeat execution of all immediate updates until the system has converged to a steady-state - // We have to take into account that the onConverge callback can also trigger new actions. - do { - // Execute all immediate updates - while (true) { - val ctx = queue.poll() ?: break - ctx.doUpdate(now, visited, futureQueue, isImmediate = true) + futureQueue.poll() + timer.ctx.doUpdate(now, visited, futureQueue, isImmediate = false) } - while (true) { - val ctx = visited.poll() ?: break - ctx.onConverge(now) - } - } while (queue.isNotEmpty()) + // Repeat execution of all immediate updates until the system has converged to a steady-state + // We have to take into account that the onConverge callback can also trigger new actions. + do { + // Execute all immediate updates + while (true) { + val ctx = queue.poll() ?: break + ctx.doUpdate(now, visited, futureQueue, isImmediate = true) + } + + while (true) { + val ctx = visited.poll() ?: break + ctx.onConverge(now) + } + } while (queue.isNotEmpty()) + } finally { + // Decrement batch index to indicate no engine is active at the moment + batchIndex-- + } // Schedule an engine invocation for the next update to occur. val headTimer = futureQueue.peek() @@ -195,24 +191,14 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va * @param scheduled The queue of scheduled invocations. */ private fun trySchedule(now: Long, scheduled: ArrayDeque<Invocation>, target: Long) { - while (true) { - val invocation = scheduled.peekFirst() - if (invocation == null || invocation.timestamp > target) { - // Case 2: A new timer was registered ahead of the other timers. - // Solution: Schedule a new scheduler invocation - @OptIn(InternalCoroutinesApi::class) - val handle = delay.invokeOnTimeout(target - now, { runEngine(target) }, context) - scheduled.addFirst(Invocation(target, handle)) - break - } else if (invocation.timestamp < target) { - // Case 2: A timer was cancelled and the head of the timer queue is now later than excepted - // Solution: Cancel the next scheduler invocation - scheduled.pollFirst() - - invocation.cancel() - } else { - break - } + val head = scheduled.peek() + + // Only schedule a new scheduler invocation in case the target is earlier than all other pending + // scheduler invocations + if (head == null || target < head.timestamp) { + @OptIn(InternalCoroutinesApi::class) + val handle = delay.invokeOnTimeout(target - now, this, context) + scheduled.addFirst(Invocation(target, handle)) } } |
