diff options
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src')
2 files changed, 25 insertions, 10 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index f62528ed..a4d82a3d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -95,6 +95,11 @@ internal class FlowConsumerContextImpl( private var _isImmediateUpdateScheduled = false /** + * A flag that indicates to the [FlowEngine] that the context is already enqueued to converge. + */ + private var _willConverge: Boolean = false + + /** * The timestamp of calls to the callbacks. */ private var _lastPull: Long = Long.MIN_VALUE // Last call to `onPull` @@ -177,12 +182,18 @@ internal class FlowConsumerContextImpl( } /** - * Update the state of the resource context. + * Update the state of the flow connection. + * + * @param now The current virtual timestamp. + * @return A flag to indicate whether the connection has already been updated before convergence. */ - fun doUpdate(now: Long) { + fun doUpdate(now: Long): Boolean { + val willConverge = _willConverge + _willConverge = true + val oldState = _state if (oldState != State.Active) { - return + return willConverge } _isUpdateActive = true @@ -238,6 +249,8 @@ internal class FlowConsumerContextImpl( } finally { _isUpdateActive = false } + + return willConverge } /** @@ -270,6 +283,7 @@ internal class FlowConsumerContextImpl( fun onConverge(timestamp: Long) { val delta = max(0, timestamp - _lastConvergence) _lastConvergence = timestamp + _willConverge = false try { if (_state == State.Active) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt index 1a50da2c..5f15fbed 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt @@ -63,7 +63,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va /** * The systems that have been visited during the engine cycle. */ - private val visited = linkedSetOf<FlowConsumerContextImpl>() + private val visited = ArrayDeque<FlowConsumerContextImpl>() /** * The index in the batch stack. @@ -80,8 +80,9 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va * Update the specified [ctx] synchronously. */ fun scheduleSync(now: Long, ctx: FlowConsumerContextImpl) { - ctx.doUpdate(now) - visited.add(ctx) + if (!ctx.doUpdate(now)) { + visited.add(ctx) + } // In-case the engine is already running in the call-stack, return immediately. The changes will be picked // up by the active engine. @@ -192,8 +193,9 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va ctx.pruneTimers(now) if (ctx.shouldUpdate(now)) { - ctx.doUpdate(now) - visited.add(ctx) + if (!ctx.doUpdate(now)) { + visited.add(ctx) + } } else { ctx.tryReschedule(now) } @@ -206,8 +208,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va while (true) { val ctx = queue.poll() ?: break - if (ctx.shouldUpdate(now)) { - ctx.doUpdate(now) + if (ctx.shouldUpdate(now) && !ctx.doUpdate(now)) { visited.add(ctx) } } |
