From 841eaeb84a96cb1b20172b1ab293ebef0bb573a5 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 11 Jan 2022 13:53:07 +0100 Subject: fix(simulator): Flush results before accessing counters This change updates the simulator implementation to flush the active progress when accessing the hypervisor counters. Previously, if the counters were accessed, while the mux or consumer was in progress, its counter values were not accurate. --- .../compute/kernel/SimAbstractHypervisor.kt | 21 +++++++++-- .../compute/kernel/SimHypervisorCounters.kt | 5 +++ .../opendc/simulator/flow/mux/FlowMultiplexer.kt | 10 ++++++ .../flow/mux/ForwardingFlowMultiplexer.kt | 4 +++ .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 42 +++++++++++++++------- 5 files changed, 67 insertions(+), 15 deletions(-) (limited to 'opendc-simulator') diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index 07465126..b7f70749 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -137,7 +137,9 @@ public abstract class SimAbstractHypervisor( /* FlowConvergenceListener */ override fun onConverge(now: Long, delta: Long) { - _counters.record() + if (delta > 0) { + _counters.record() + } val load = cpuDemand / cpuCapacity for (governor in governors) { @@ -274,6 +276,10 @@ public abstract class SimAbstractHypervisor( fun close() { switch.removeInput(source) } + + fun flush() { + switch.flushCounters(source) + } } /** @@ -337,6 +343,11 @@ public abstract class SimAbstractHypervisor( cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong() cpuTime[3] += (interferenceDelta * d).roundToLong() } + + override fun flush() { + hv.mux.flushCounters() + record() + } } /** @@ -348,10 +359,16 @@ public abstract class SimAbstractHypervisor( override val cpuActiveTime: Long get() = (cpus.sumOf { it.counters.actual } * d).roundToLong() override val cpuIdleTime: Long - get() = (cpus.sumOf { it.counters.actual + it.counters.remaining } * d).roundToLong() + get() = (cpus.sumOf { it.counters.remaining } * d).roundToLong() override val cpuStealTime: Long get() = (cpus.sumOf { it.counters.demand - it.counters.actual } * d).roundToLong() override val cpuLostTime: Long get() = (cpus.sumOf { it.counters.interference } * d).roundToLong() + + override fun flush() { + for (cpu in cpus) { + cpu.flush() + } + } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt index 030d9c5f..63fee507 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorCounters.kt @@ -45,4 +45,9 @@ public interface SimHypervisorCounters { * The amount of CPU time (in milliseconds) that was lost due to interference between virtual machines. */ public val cpuLostTime: Long + + /** + * Flush the counter values. + */ + public fun flush() } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt index a7877546..5f198944 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt @@ -105,4 +105,14 @@ public interface FlowMultiplexer { * Clear the outputs of the multiplexer. */ public fun clearOutputs() + + /** + * Flush the counters of the multiplexer. + */ + public fun flushCounters() + + /** + * Flush the counters of the specified [input]. + */ + public fun flushCounters(input: FlowConsumer) } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt index eff111b8..7fe0c1b7 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -135,6 +135,10 @@ public class ForwardingFlowMultiplexer( clearInputs() } + override fun flushCounters() {} + + override fun flushCounters(input: FlowConsumer) {} + private var _lastConverge = Long.MAX_VALUE override fun onConverge(now: Long, delta: Long) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 3d26efda..7ee4d326 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -141,6 +141,14 @@ public class MaxMinFlowMultiplexer( clearInputs() } + override fun flushCounters() { + scheduler.updateCounters(engine.clock.millis()) + } + + override fun flushCounters(input: FlowConsumer) { + (input as Input).doUpdateCounters(engine.clock.millis()) + } + /** * Helper class containing the scheduler state. */ @@ -189,7 +197,6 @@ public class MaxMinFlowMultiplexer( * Flag to indicate that the scheduler is active. */ private var _schedulerActive = false - private var _lastSchedulerCycle = Long.MAX_VALUE /** * The last convergence timestamp and the input. @@ -324,14 +331,9 @@ public class MaxMinFlowMultiplexer( * Synchronously run the scheduler of the multiplexer. */ fun runScheduler(now: Long): Long { - val lastSchedulerCycle = _lastSchedulerCycle - _lastSchedulerCycle = now - - val delta = max(0, now - lastSchedulerCycle) - return try { _schedulerActive = true - doRunScheduler(now, delta) + doRunScheduler(now) } finally { _schedulerActive = false } @@ -384,14 +386,14 @@ public class MaxMinFlowMultiplexer( * * @return The deadline after which a new scheduling cycle should start. */ - private fun doRunScheduler(now: Long, delta: Long): Long { + private fun doRunScheduler(now: Long): Long { val activeInputs = _activeInputs val activeOutputs = _activeOutputs var inputArray = _inputArray var inputSize = _inputArray.size // Update the counters of the scheduler - updateCounters(delta) + updateCounters(now) // If there is no work yet, mark the inputs as idle. if (inputSize == 0) { @@ -475,14 +477,19 @@ public class MaxMinFlowMultiplexer( * The previous capacity of the multiplexer. */ private var _previousCapacity = 0.0 + private var _previousUpdate = Long.MIN_VALUE /** * Update the counters of the scheduler. */ - private fun updateCounters(delta: Long) { + fun updateCounters(now: Long) { val previousCapacity = _previousCapacity _previousCapacity = capacity + val previousUpdate = _previousUpdate + _previousUpdate = now + + val delta = (now - previousUpdate).coerceAtLeast(0) if (delta <= 0) { return } @@ -643,7 +650,7 @@ public class MaxMinFlowMultiplexer( delta: Long, rate: Double ) { - doUpdateCounters(delta) + doUpdateCounters(now) val allowed = min(rate, capacity) limit = rate @@ -654,7 +661,7 @@ public class MaxMinFlowMultiplexer( } override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { - doUpdateCounters(delta) + doUpdateCounters(now) limit = 0.0 actualRate = 0.0 @@ -672,10 +679,19 @@ public class MaxMinFlowMultiplexer( /* Comparable */ override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate) + /** + * The timestamp that the counters where last updated. + */ + private var _lastUpdate = Long.MIN_VALUE + /** * Helper method to update the flow counters of the multiplexer. */ - private fun doUpdateCounters(delta: Long) { + fun doUpdateCounters(now: Long) { + val lastUpdate = _lastUpdate + _lastUpdate = now + + val delta = (now - lastUpdate).coerceAtLeast(0) if (delta <= 0L) { return } -- cgit v1.2.3