summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-01-11 13:53:07 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-02-18 16:53:52 +0100
commit841eaeb84a96cb1b20172b1ab293ebef0bb573a5 (patch)
tree1c744a0ad30d739de09bc5ffe7b1631cee925de3 /opendc-simulator/opendc-simulator-flow/src/main/kotlin/org
parent2615c1c3e6c3f79bc14386398bb1d14d65c17512 (diff)
fix(simulator): Flush results before accessing counters
This change updates the simulator implementation to flush the active progress when accessing the hypervisor counters. Previously, if the counters were accessed, while the mux or consumer was in progress, its counter values were not accurate.
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src/main/kotlin/org')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt10
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt42
3 files changed, 43 insertions, 13 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
index a7877546..5f198944 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
@@ -105,4 +105,14 @@ public interface FlowMultiplexer {
* Clear the outputs of the multiplexer.
*/
public fun clearOutputs()
+
+ /**
+ * Flush the counters of the multiplexer.
+ */
+ public fun flushCounters()
+
+ /**
+ * Flush the counters of the specified [input].
+ */
+ public fun flushCounters(input: FlowConsumer)
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
index eff111b8..7fe0c1b7 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
@@ -135,6 +135,10 @@ public class ForwardingFlowMultiplexer(
clearInputs()
}
+ override fun flushCounters() {}
+
+ override fun flushCounters(input: FlowConsumer) {}
+
private var _lastConverge = Long.MAX_VALUE
override fun onConverge(now: Long, delta: Long) {
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
index 3d26efda..7ee4d326 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -141,6 +141,14 @@ public class MaxMinFlowMultiplexer(
clearInputs()
}
+ override fun flushCounters() {
+ scheduler.updateCounters(engine.clock.millis())
+ }
+
+ override fun flushCounters(input: FlowConsumer) {
+ (input as Input).doUpdateCounters(engine.clock.millis())
+ }
+
/**
* Helper class containing the scheduler state.
*/
@@ -189,7 +197,6 @@ public class MaxMinFlowMultiplexer(
* Flag to indicate that the scheduler is active.
*/
private var _schedulerActive = false
- private var _lastSchedulerCycle = Long.MAX_VALUE
/**
* The last convergence timestamp and the input.
@@ -324,14 +331,9 @@ public class MaxMinFlowMultiplexer(
* Synchronously run the scheduler of the multiplexer.
*/
fun runScheduler(now: Long): Long {
- val lastSchedulerCycle = _lastSchedulerCycle
- _lastSchedulerCycle = now
-
- val delta = max(0, now - lastSchedulerCycle)
-
return try {
_schedulerActive = true
- doRunScheduler(now, delta)
+ doRunScheduler(now)
} finally {
_schedulerActive = false
}
@@ -384,14 +386,14 @@ public class MaxMinFlowMultiplexer(
*
* @return The deadline after which a new scheduling cycle should start.
*/
- private fun doRunScheduler(now: Long, delta: Long): Long {
+ private fun doRunScheduler(now: Long): Long {
val activeInputs = _activeInputs
val activeOutputs = _activeOutputs
var inputArray = _inputArray
var inputSize = _inputArray.size
// Update the counters of the scheduler
- updateCounters(delta)
+ updateCounters(now)
// If there is no work yet, mark the inputs as idle.
if (inputSize == 0) {
@@ -475,14 +477,19 @@ public class MaxMinFlowMultiplexer(
* The previous capacity of the multiplexer.
*/
private var _previousCapacity = 0.0
+ private var _previousUpdate = Long.MIN_VALUE
/**
* Update the counters of the scheduler.
*/
- private fun updateCounters(delta: Long) {
+ fun updateCounters(now: Long) {
val previousCapacity = _previousCapacity
_previousCapacity = capacity
+ val previousUpdate = _previousUpdate
+ _previousUpdate = now
+
+ val delta = (now - previousUpdate).coerceAtLeast(0)
if (delta <= 0) {
return
}
@@ -643,7 +650,7 @@ public class MaxMinFlowMultiplexer(
delta: Long,
rate: Double
) {
- doUpdateCounters(delta)
+ doUpdateCounters(now)
val allowed = min(rate, capacity)
limit = rate
@@ -654,7 +661,7 @@ public class MaxMinFlowMultiplexer(
}
override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {
- doUpdateCounters(delta)
+ doUpdateCounters(now)
limit = 0.0
actualRate = 0.0
@@ -673,9 +680,18 @@ public class MaxMinFlowMultiplexer(
override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate)
/**
+ * The timestamp that the counters where last updated.
+ */
+ private var _lastUpdate = Long.MIN_VALUE
+
+ /**
* Helper method to update the flow counters of the multiplexer.
*/
- private fun doUpdateCounters(delta: Long) {
+ fun doUpdateCounters(now: Long) {
+ val lastUpdate = _lastUpdate
+ _lastUpdate = now
+
+ val delta = (now - lastUpdate).coerceAtLeast(0)
if (delta <= 0L) {
return
}