summaryrefslogtreecommitdiff
path: root/opendc-simulator
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt2
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt43
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt338
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt112
4 files changed, 241 insertions, 254 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
index 549a338b..b4eb6a38 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt
@@ -44,6 +44,8 @@ public class FlowSink(
override fun createLogic(): FlowConsumerLogic {
return object : FlowConsumerLogic {
+ private val parent = this@FlowSink.parent
+
override fun onPush(
ctx: FlowConsumerContext,
now: Long,
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt
new file mode 100644
index 00000000..81ed9f26
--- /dev/null
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.simulator.flow.internal
+
+/**
+ * States of the flow connection.
+ */
+internal const val ConnPending = 0 // Connection is pending and the consumer is waiting to consume the source
+internal const val ConnActive = 1 // Connection is active and the source is currently being consumed
+internal const val ConnClosed = 2 // Connection is closed and source cannot be consumed through this connection anymore
+internal const val ConnState = 0b11 // Mask for accessing the state of the flow connection
+
+/**
+ * Flags of the flow connection
+ */
+internal const val ConnPulled = 1 shl 2 // The source should be pulled
+internal const val ConnPushed = 1 shl 3 // The source has pushed a value
+internal const val ConnUpdateActive = 1 shl 4 // An update for the connection is active
+internal const val ConnUpdatePending = 1 shl 5 // An (immediate) update of the connection is pending
+internal const val ConnUpdateSkipped = 1 shl 6 // An update of the connection was not necessary
+internal const val ConnConvergePending = 1 shl 7 // Indication that a convergence is already pending
+internal const val ConnConvergeSource = 1 shl 8 // Enable convergence of the source
+internal const val ConnConvergeConsumer = 1 shl 9 // Enable convergence of the consumer
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
index c087a28d..9d36483e 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
@@ -24,7 +24,7 @@ package org.opendc.simulator.flow.internal
import mu.KotlinLogging
import org.opendc.simulator.flow.*
-import java.util.ArrayDeque
+import java.util.*
import kotlin.math.max
import kotlin.math.min
@@ -44,20 +44,18 @@ internal class FlowConsumerContextImpl(
/**
* The capacity of the connection.
*/
- override var capacity: Double = 0.0
+ override var capacity: Double
+ get() = _capacity
set(value) {
- val oldValue = field
+ val oldValue = _capacity
// Only changes will be propagated
if (value != oldValue) {
- field = value
-
- // Do not pull the source if it has not been started yet
- if (_state == State.Active) {
- pull()
- }
+ _capacity = value
+ pull()
}
}
+ private var _capacity: Double = 0.0
/**
* The current processing rate of the connection.
@@ -75,8 +73,25 @@ internal class FlowConsumerContextImpl(
/**
* Flags to control the convergence of the consumer and source.
*/
- override var shouldConsumerConverge: Boolean = false
override var shouldSourceConverge: Boolean = false
+ set(value) {
+ field = value
+ _flags =
+ if (value)
+ _flags or ConnConvergeSource
+ else
+ _flags and ConnConvergeSource.inv()
+ }
+ override var shouldConsumerConverge: Boolean = false
+ set(value) {
+ field = value
+
+ _flags =
+ if (value)
+ _flags or ConnConvergeConsumer
+ else
+ _flags and ConnConvergeConsumer.inv()
+ }
/**
* The clock to track simulation time.
@@ -84,36 +99,15 @@ internal class FlowConsumerContextImpl(
private val _clock = engine.clock
/**
- * A flag to indicate the state of the connection.
- */
- private var _state = State.Pending
-
- /**
* The current state of the connection.
*/
private var _demand: Double = 0.0 // The current (pending) demand of the source
- private var _activeDemand: Double = 0.0 // The previous demand of the source
private var _deadline: Long = Long.MAX_VALUE // The deadline of the source's timer
/**
- * A flag to indicate that the source should be pulled.
- */
- private var _isPulled = false
-
- /**
- * A flag to indicate that an update is active.
- */
- private var _isUpdateActive = false
-
- /**
- * A flag to indicate that an immediate update is scheduled.
+ * The flags of the flow connection, indicating certain actions.
*/
- private var _isImmediateUpdateScheduled = false
-
- /**
- * A flag that indicates to the [FlowEngine] that the context is already enqueued to converge.
- */
- private var _willConverge: Boolean = false
+ private var _flags: Int = 0
/**
* The timestamp of calls to the callbacks.
@@ -130,44 +124,53 @@ internal class FlowConsumerContextImpl(
private val _pendingTimers: ArrayDeque<FlowEngineImpl.Timer> = ArrayDeque(5)
override fun start() {
- check(_state == State.Pending) { "Consumer is already started" }
+ check(_flags and ConnState == ConnPending) { "Consumer is already started" }
engine.batch {
- source.onStart(this, _clock.millis())
- _state = State.Active
+ val now = _clock.millis()
+ source.onStart(this, now)
- pull()
+ // Mark the connection as active and pulled
+ val newFlags = (_flags and ConnState.inv()) or ConnActive or ConnPulled
+ scheduleImmediate(now, newFlags)
}
}
override fun close() {
- if (_state == State.Closed) {
+ var flags = _flags
+ if (flags and ConnState == ConnClosed) {
return
}
engine.batch {
- _state = State.Closed
- if (!_isUpdateActive) {
+ // Mark the connection as closed and pulled (in order to converge)
+ flags = (flags and ConnState.inv()) or ConnClosed or ConnPulled
+ _flags = flags
+
+ if (flags and ConnUpdateActive == 0) {
val now = _clock.millis()
doStopSource(now)
// FIX: Make sure the context converges
- pull()
+ scheduleImmediate(now, flags)
}
}
}
override fun pull() {
- if (_state == State.Closed) {
+ val flags = _flags
+ if (flags and ConnState != ConnActive) {
return
}
- _isPulled = true
- scheduleImmediate()
+ // Mark connection as pulled
+ scheduleImmediate(_clock.millis(), flags or ConnPulled)
}
override fun flush() {
+ val flags = _flags
+
// Do not attempt to flush the connection if the connection is closed or an update is already active
- if (_state == State.Closed || _isUpdateActive) {
+ if (flags and ConnState != ConnActive || flags and ConnUpdateActive != 0) {
return
}
@@ -181,118 +184,145 @@ internal class FlowConsumerContextImpl(
_demand = rate
- // Invalidate only if the active demand is changed and no update is active
- // If an update is active, it will already get picked up at the end of the update
- if (_activeDemand != rate && !_isUpdateActive) {
- scheduleImmediate()
- }
- }
+ val flags = _flags
- /**
- * Determine whether the state of the flow connection should be updated.
- */
- fun shouldUpdate(timestamp: Long): Boolean {
- // The flow connection should be updated for three reasons:
- // (1) The source should be pulled (after a call to `pull`)
- // (2) The demand of the source has changed (after a call to `push`)
- // (3) The timer of the source expired
- return _isPulled || _demand != _activeDemand || _deadline == timestamp
+ if (flags and ConnUpdateActive != 0) {
+ // If an update is active, it will already get picked up at the end of the update
+ _flags = flags or ConnPushed
+ } else {
+ // Invalidate only if no update is active
+ scheduleImmediate(_clock.millis(), flags or ConnPushed)
+ }
}
/**
* Update the state of the flow connection.
*
* @param now The current virtual timestamp.
- * @return A flag to indicate whether the connection has already been updated before convergence.
+ * @param visited The queue of connections that have been visited during the cycle.
+ * @param timerQueue The queue of all pending timers.
+ * @param isImmediate A flag to indicate that this invocation is an immediate update or a delayed update.
*/
- fun doUpdate(now: Long): Boolean {
- // The connection will only converge if either the source or the consumer wants the converge callback to be
- // invoked.
- val shouldConverge = shouldSourceConverge || shouldConsumerConverge
- var willConverge = false
- if (shouldConverge) {
- willConverge = _willConverge
- _willConverge = true
- }
-
- val oldState = _state
- if (oldState != State.Active) {
- return willConverge
+ fun doUpdate(
+ now: Long,
+ visited: ArrayDeque<FlowConsumerContextImpl>,
+ timerQueue: PriorityQueue<FlowEngineImpl.Timer>,
+ isImmediate: Boolean
+ ) {
+ var flags = _flags
+
+ // Precondition: The flow connection must be active
+ if (flags and ConnState != ConnActive) {
+ return
}
- _isUpdateActive = true
- _isImmediateUpdateScheduled = false
+ val deadline = _deadline
+ val reachedDeadline = deadline == now
+ var newDeadline = deadline
+ var hasUpdated = false
try {
// Pull the source if (1) `pull` is called or (2) the timer of the source has expired
- val deadline = if (_isPulled || _deadline == now) {
+ newDeadline = if (flags and ConnPulled != 0 || reachedDeadline) {
val lastPull = _lastPull
val delta = max(0, now - lastPull)
- _isPulled = false
+ // Update state before calling into the outside world, so it observes a consistent state
_lastPull = now
+ _flags = (flags and ConnPulled.inv()) or ConnUpdateActive
+ hasUpdated = true
val duration = source.onPull(this, now, delta)
+
+ // IMPORTANT: Re-fetch the flags after the callback might have changed those
+ flags = _flags
+
if (duration != Long.MAX_VALUE)
now + duration
else
duration
} else {
- _deadline
+ deadline
}
- // Check whether the state has changed after [consumer.onNext]
- when (_state) {
- State.Active -> {
- val demand = _demand
- if (demand != _activeDemand) {
- val lastPush = _lastPush
- val delta = max(0, now - lastPush)
-
- _lastPush = now
-
- logic.onPush(this, now, delta, demand)
- }
- }
- State.Closed -> doStopSource(now)
- State.Pending -> throw IllegalStateException("Illegal transition to pending state")
- }
+ // Push to the consumer if the rate of the source has changed (after a call to `push`)
+ val newState = flags and ConnState
+ if (newState == ConnActive && flags and ConnPushed != 0) {
+ val lastPush = _lastPush
+ val delta = max(0, now - lastPush)
- // Note: pending limit might be changed by [logic.onConsume], so re-fetch the value
- val newLimit = _demand
+ // Update state before calling into the outside world, so it observes a consistent state
+ _lastPush = now
+ _flags = (flags and ConnPushed.inv()) or ConnUpdateActive
+ hasUpdated = true
- // Flush the changes to the flow
- _activeDemand = newLimit
- _deadline = deadline
- _rate = min(capacity, newLimit)
+ logic.onPush(this, now, delta, _demand)
- // Schedule an update at the new deadline
- scheduleDelayed(now, deadline)
+ // IMPORTANT: Re-fetch the flags after the callback might have changed those
+ flags = _flags
+ } else if (newState == ConnClosed) {
+ hasUpdated = true
+
+ // The source has called [FlowConnection.close], so clean up the connection
+ doStopSource(now)
+ }
} catch (cause: Throwable) {
+ // Mark the connection as closed
+ flags = (flags and ConnState.inv()) or ConnClosed
+
doFailSource(now, cause)
- } finally {
- _isUpdateActive = false
}
- return willConverge
- }
+ // Check whether the connection needs to be added to the visited queue. This is the case when:
+ // (1) An update was performed (either a push or a pull)
+ // (2) Either the source or consumer want to converge, and
+ // (3) Convergence is not already pending (ConnConvergePending)
+ if (hasUpdated && flags and (ConnConvergeSource or ConnConvergeConsumer) != 0 && flags and ConnConvergePending == 0) {
+ visited.add(this)
+ flags = flags or ConnConvergePending
+ }
- /**
- * Prune the elapsed timers from this context.
- */
- fun updateTimers() {
- // Invariant: Any pending timer should only point to a future timestamp
- // See also `scheduleDelayed`
- _timer = _pendingTimers.poll()
- }
+ // Compute the new flow rate of the connection
+ // Note: _demand might be changed by [logic.onConsume], so we must re-fetch the value
+ _rate = min(_capacity, _demand)
- /**
- * Try to re-schedule the resource context in case it was skipped.
- */
- fun tryReschedule(now: Long) {
- val deadline = _deadline
- if (deadline > now && deadline != Long.MAX_VALUE) {
- scheduleDelayed(now, deadline)
+ // Indicate that no update is active anymore and flush the flags
+ _flags = flags and ConnUpdateActive.inv() and ConnUpdatePending.inv()
+
+ val pendingTimers = _pendingTimers
+
+ // Prune the head timer if this is a delayed update
+ val timer = if (!isImmediate) {
+ // Invariant: Any pending timer should only point to a future timestamp
+ // See also `scheduleDelayed`
+ val timer = pendingTimers.poll()
+ _timer = timer
+ timer
+ } else {
+ _timer
+ }
+
+ // Set the new deadline and schedule a delayed update for that deadline
+ _deadline = newDeadline
+
+ // Check whether we need to schedule a new timer for this connection. That is the case when:
+ // (1) The deadline is valid (not the maximum value)
+ // (2) The connection is active
+ // (3) The current active timer for the connection points to a later deadline
+ if (newDeadline == Long.MAX_VALUE || flags and ConnState != ConnActive || (timer != null && newDeadline >= timer.target)) {
+ // Ignore any deadline scheduled at the maximum value
+ // This indicates that the source does not want to register a timer
+ return
+ }
+
+ // Construct a timer for the new deadline and add it to the global queue of timers
+ val newTimer = FlowEngineImpl.Timer(this, newDeadline)
+ _timer = newTimer
+ timerQueue.add(newTimer)
+
+ // A timer already exists for this connection, so add it to the queue of pending timers
+ if (timer != null) {
+ pendingTimers.addFirst(timer)
}
}
@@ -300,17 +330,22 @@ internal class FlowConsumerContextImpl(
* This method is invoked when the system converges into a steady state.
*/
fun onConverge(now: Long) {
- _willConverge = false
-
try {
- if (_state == State.Active && shouldSourceConverge) {
+ val flags = _flags
+
+ // The connection is converging now, so unset the convergence pending flag
+ _flags = flags and ConnConvergePending.inv()
+
+ // Call the source converge callback if it has enabled convergence and the connection is active
+ if (flags and ConnState == ConnActive && flags and ConnConvergeSource != 0) {
val delta = max(0, now - _lastSourceConvergence)
_lastSourceConvergence = now
source.onConverge(this, now, delta)
}
- if (shouldConsumerConverge) {
+ // Call the consumer callback if it has enabled convergence
+ if (flags and ConnConvergeConsumer != 0) {
val delta = max(0, now - _lastConsumerConvergence)
_lastConsumerConvergence = now
@@ -369,57 +404,16 @@ internal class FlowConsumerContextImpl(
/**
* Schedule an immediate update for this connection.
*/
- private fun scheduleImmediate() {
+ private fun scheduleImmediate(now: Long, flags: Int) {
// In case an immediate update is already scheduled, no need to do anything
- if (_isImmediateUpdateScheduled) {
+ if (flags and ConnUpdatePending != 0) {
+ _flags = flags
return
}
- _isImmediateUpdateScheduled = true
+ // Mark the connection that there is an update pending
+ _flags = flags or ConnUpdatePending
- val now = _clock.millis()
engine.scheduleImmediate(now, this)
}
-
- /**
- * Schedule a delayed update for this resource context.
- */
- private fun scheduleDelayed(now: Long, target: Long) {
- // Ignore any target scheduled at the maximum value
- // This indicates that the sources does not want to register a timer
- if (target == Long.MAX_VALUE) {
- return
- }
-
- val timer = _timer
-
- if (timer == null) {
- // No existing timer exists, so schedule a new timer and update the head
- _timer = engine.scheduleDelayed(now, this, target)
- } else if (target < timer.target) {
- // Existing timer is further in the future, so schedule a new timer ahead of it
- _timer = engine.scheduleDelayed(now, this, target)
- _pendingTimers.addFirst(timer)
- }
- }
-
- /**
- * The state of a flow connection.
- */
- private enum class State {
- /**
- * The connection is pending and the consumer is waiting to consume the source.
- */
- Pending,
-
- /**
- * The connection is active and the source is currently being consumed.
- */
- Active,
-
- /**
- * The connection is closed and the source cannot be consumed through this connection anymore.
- */
- Closed
- }
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
index c8170a43..019b5f10 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
@@ -63,7 +63,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
/**
* The systems that have been visited during the engine cycle.
*/
- private val visited = ArrayDeque<FlowConsumerContextImpl>()
+ private val visited: ArrayDeque<FlowConsumerContextImpl> = ArrayDeque()
/**
* The index in the batch stack.
@@ -71,31 +71,18 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
private var batchIndex = 0
/**
- * A flag to indicate that the engine is currently active.
- */
- private val isRunning: Boolean
- get() = batchIndex > 0
-
- /**
* Update the specified [ctx] synchronously.
*/
fun scheduleSync(now: Long, ctx: FlowConsumerContextImpl) {
- if (!ctx.doUpdate(now)) {
- visited.add(ctx)
- }
+ ctx.doUpdate(now, visited, futureQueue, isImmediate = true)
// In-case the engine is already running in the call-stack, return immediately. The changes will be picked
// up by the active engine.
- if (isRunning) {
+ if (batchIndex > 0) {
return
}
- try {
- batchIndex++
- runEngine(now)
- } finally {
- batchIndex--
- }
+ runEngine(now)
}
/**
@@ -109,36 +96,11 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
// In-case the engine is already running in the call-stack, return immediately. The changes will be picked
// up by the active engine.
- if (isRunning) {
+ if (batchIndex > 0) {
return
}
- try {
- batchIndex++
- runEngine(now)
- } finally {
- batchIndex--
- }
- }
-
- /**
- * Schedule the engine to run at [target] to update the flow contexts.
- *
- * This method will override earlier calls to this method for the same [ctx].
- *
- * @param now The current virtual timestamp.
- * @param ctx The flow context to which the event applies.
- * @param target The timestamp when the interrupt should happen.
- */
- fun scheduleDelayed(now: Long, ctx: FlowConsumerContextImpl, target: Long): Timer {
- val futureQueue = futureQueue
-
- require(target >= now) { "Timestamp must be in the future" }
-
- val timer = Timer(ctx, target)
- futureQueue.add(timer)
-
- return timer
+ runEngine(now)
}
override fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext = FlowConsumerContextImpl(this, consumer, provider)
@@ -149,9 +111,9 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
override fun popBatch() {
try {
- // Flush the work if the platform is not already running
+ // Flush the work if the engine is not already running
if (batchIndex == 1 && queue.isNotEmpty()) {
- runEngine(clock.millis())
+ doRunEngine(clock.millis())
}
} finally {
batchIndex--
@@ -159,9 +121,21 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
}
/**
- * Run all the enqueued actions for the specified [timestamp][now].
+ * Run the engine and mark as active while running.
*/
private fun runEngine(now: Long) {
+ try {
+ batchIndex++
+ doRunEngine(now)
+ } finally {
+ batchIndex--
+ }
+ }
+
+ /**
+ * Run all the enqueued actions for the specified [timestamp][now].
+ */
+ private fun doRunEngine(now: Long) {
val queue = queue
val futureQueue = futureQueue
val futureInvocations = futureInvocations
@@ -179,27 +153,16 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
// Execute all scheduled updates at current timestamp
while (true) {
val timer = futureQueue.peek() ?: break
- val ctx = timer.ctx
val target = timer.target
- assert(target >= now) { "Internal inconsistency: found update of the past" }
-
if (target > now) {
break
}
- futureQueue.poll()
-
- // Update the existing timers of the connection
- ctx.updateTimers()
+ assert(target >= now) { "Internal inconsistency: found update of the past" }
- if (ctx.shouldUpdate(now)) {
- if (!ctx.doUpdate(now)) {
- visited.add(ctx)
- }
- } else {
- ctx.tryReschedule(now)
- }
+ futureQueue.poll()
+ timer.ctx.doUpdate(now, visited, futureQueue, isImmediate = false)
}
// Repeat execution of all immediate updates until the system has converged to a steady-state
@@ -208,17 +171,13 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
// Execute all immediate updates
while (true) {
val ctx = queue.poll() ?: break
-
- if (ctx.shouldUpdate(now) && !ctx.doUpdate(now)) {
- visited.add(ctx)
- }
+ ctx.doUpdate(now, visited, futureQueue, isImmediate = true)
}
- for (system in visited) {
- system.onConverge(now)
+ while (true) {
+ val ctx = visited.poll() ?: break
+ ctx.onConverge(now)
}
-
- visited.clear()
} while (queue.isNotEmpty())
// Schedule an engine invocation for the next update to occur.
@@ -242,18 +201,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
// Case 2: A new timer was registered ahead of the other timers.
// Solution: Schedule a new scheduler invocation
@OptIn(InternalCoroutinesApi::class)
- val handle = delay.invokeOnTimeout(
- target - now,
- {
- try {
- batchIndex++
- runEngine(target)
- } finally {
- batchIndex--
- }
- },
- context
- )
+ val handle = delay.invokeOnTimeout(target - now, { runEngine(target) }, context)
scheduled.addFirst(Invocation(target, handle))
break
} else if (invocation.timestamp < target) {
@@ -274,7 +222,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
* This class is used to keep track of the future engine invocations created using the [Delay] instance. In case
* the invocation is not needed anymore, it can be cancelled via [cancel].
*/
- private data class Invocation(
+ private class Invocation(
@JvmField val timestamp: Long,
@JvmField val handle: DisposableHandle
) {