summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt20
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt15
2 files changed, 25 insertions, 10 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
index f62528ed..a4d82a3d 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
@@ -95,6 +95,11 @@ internal class FlowConsumerContextImpl(
private var _isImmediateUpdateScheduled = false
/**
+ * A flag that indicates to the [FlowEngine] that the context is already enqueued to converge.
+ */
+ private var _willConverge: Boolean = false
+
+ /**
* The timestamp of calls to the callbacks.
*/
private var _lastPull: Long = Long.MIN_VALUE // Last call to `onPull`
@@ -177,12 +182,18 @@ internal class FlowConsumerContextImpl(
}
/**
- * Update the state of the resource context.
+ * Update the state of the flow connection.
+ *
+ * @param now The current virtual timestamp.
+ * @return A flag to indicate whether the connection has already been updated before convergence.
*/
- fun doUpdate(now: Long) {
+ fun doUpdate(now: Long): Boolean {
+ val willConverge = _willConverge
+ _willConverge = true
+
val oldState = _state
if (oldState != State.Active) {
- return
+ return willConverge
}
_isUpdateActive = true
@@ -238,6 +249,8 @@ internal class FlowConsumerContextImpl(
} finally {
_isUpdateActive = false
}
+
+ return willConverge
}
/**
@@ -270,6 +283,7 @@ internal class FlowConsumerContextImpl(
fun onConverge(timestamp: Long) {
val delta = max(0, timestamp - _lastConvergence)
_lastConvergence = timestamp
+ _willConverge = false
try {
if (_state == State.Active) {
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
index 1a50da2c..5f15fbed 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt
@@ -63,7 +63,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
/**
* The systems that have been visited during the engine cycle.
*/
- private val visited = linkedSetOf<FlowConsumerContextImpl>()
+ private val visited = ArrayDeque<FlowConsumerContextImpl>()
/**
* The index in the batch stack.
@@ -80,8 +80,9 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
* Update the specified [ctx] synchronously.
*/
fun scheduleSync(now: Long, ctx: FlowConsumerContextImpl) {
- ctx.doUpdate(now)
- visited.add(ctx)
+ if (!ctx.doUpdate(now)) {
+ visited.add(ctx)
+ }
// In-case the engine is already running in the call-stack, return immediately. The changes will be picked
// up by the active engine.
@@ -192,8 +193,9 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
ctx.pruneTimers(now)
if (ctx.shouldUpdate(now)) {
- ctx.doUpdate(now)
- visited.add(ctx)
+ if (!ctx.doUpdate(now)) {
+ visited.add(ctx)
+ }
} else {
ctx.tryReschedule(now)
}
@@ -206,8 +208,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va
while (true) {
val ctx = queue.poll() ?: break
- if (ctx.shouldUpdate(now)) {
- ctx.doUpdate(now)
+ if (ctx.shouldUpdate(now) && !ctx.doUpdate(now)) {
visited.add(ctx)
}
}