diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-01-11 13:53:07 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-02-18 16:53:52 +0100 |
| commit | 841eaeb84a96cb1b20172b1ab293ebef0bb573a5 (patch) | |
| tree | 1c744a0ad30d739de09bc5ffe7b1631cee925de3 /opendc-simulator/opendc-simulator-flow/src | |
| parent | 2615c1c3e6c3f79bc14386398bb1d14d65c17512 (diff) | |
fix(simulator): Flush results before accessing counters
This change updates the simulator implementation to flush the active
progress when accessing the hypervisor counters. Previously, if the
counters were accessed, while the mux or consumer was in progress, its
counter values were not accurate.
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src')
3 files changed, 43 insertions, 13 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt index a7877546..5f198944 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt @@ -105,4 +105,14 @@ public interface FlowMultiplexer { * Clear the outputs of the multiplexer. */ public fun clearOutputs() + + /** + * Flush the counters of the multiplexer. + */ + public fun flushCounters() + + /** + * Flush the counters of the specified [input]. + */ + public fun flushCounters(input: FlowConsumer) } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt index eff111b8..7fe0c1b7 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -135,6 +135,10 @@ public class ForwardingFlowMultiplexer( clearInputs() } + override fun flushCounters() {} + + override fun flushCounters(input: FlowConsumer) {} + private var _lastConverge = Long.MAX_VALUE override fun onConverge(now: Long, delta: Long) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 3d26efda..7ee4d326 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -141,6 +141,14 @@ public class MaxMinFlowMultiplexer( clearInputs() } + override fun flushCounters() { + scheduler.updateCounters(engine.clock.millis()) + } + + override fun flushCounters(input: FlowConsumer) { + (input as Input).doUpdateCounters(engine.clock.millis()) + } + /** * Helper class containing the scheduler state. */ @@ -189,7 +197,6 @@ public class MaxMinFlowMultiplexer( * Flag to indicate that the scheduler is active. */ private var _schedulerActive = false - private var _lastSchedulerCycle = Long.MAX_VALUE /** * The last convergence timestamp and the input. @@ -324,14 +331,9 @@ public class MaxMinFlowMultiplexer( * Synchronously run the scheduler of the multiplexer. */ fun runScheduler(now: Long): Long { - val lastSchedulerCycle = _lastSchedulerCycle - _lastSchedulerCycle = now - - val delta = max(0, now - lastSchedulerCycle) - return try { _schedulerActive = true - doRunScheduler(now, delta) + doRunScheduler(now) } finally { _schedulerActive = false } @@ -384,14 +386,14 @@ public class MaxMinFlowMultiplexer( * * @return The deadline after which a new scheduling cycle should start. */ - private fun doRunScheduler(now: Long, delta: Long): Long { + private fun doRunScheduler(now: Long): Long { val activeInputs = _activeInputs val activeOutputs = _activeOutputs var inputArray = _inputArray var inputSize = _inputArray.size // Update the counters of the scheduler - updateCounters(delta) + updateCounters(now) // If there is no work yet, mark the inputs as idle. if (inputSize == 0) { @@ -475,14 +477,19 @@ public class MaxMinFlowMultiplexer( * The previous capacity of the multiplexer. */ private var _previousCapacity = 0.0 + private var _previousUpdate = Long.MIN_VALUE /** * Update the counters of the scheduler. */ - private fun updateCounters(delta: Long) { + fun updateCounters(now: Long) { val previousCapacity = _previousCapacity _previousCapacity = capacity + val previousUpdate = _previousUpdate + _previousUpdate = now + + val delta = (now - previousUpdate).coerceAtLeast(0) if (delta <= 0) { return } @@ -643,7 +650,7 @@ public class MaxMinFlowMultiplexer( delta: Long, rate: Double ) { - doUpdateCounters(delta) + doUpdateCounters(now) val allowed = min(rate, capacity) limit = rate @@ -654,7 +661,7 @@ public class MaxMinFlowMultiplexer( } override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { - doUpdateCounters(delta) + doUpdateCounters(now) limit = 0.0 actualRate = 0.0 @@ -673,9 +680,18 @@ public class MaxMinFlowMultiplexer( override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate) /** + * The timestamp that the counters where last updated. + */ + private var _lastUpdate = Long.MIN_VALUE + + /** * Helper method to update the flow counters of the multiplexer. */ - private fun doUpdateCounters(delta: Long) { + fun doUpdateCounters(now: Long) { + val lastUpdate = _lastUpdate + _lastUpdate = now + + val delta = (now - lastUpdate).coerceAtLeast(0) if (delta <= 0L) { return } |
