summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt10
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt4
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt42
3 files changed, 43 insertions, 13 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
index a7877546..5f198944 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt
@@ -105,4 +105,14 @@ public interface FlowMultiplexer {
* Clear the outputs of the multiplexer.
*/
public fun clearOutputs()
+
+ /**
+ * Flush the counters of the multiplexer.
+ */
+ public fun flushCounters()
+
+ /**
+ * Flush the counters of the specified [input].
+ */
+ public fun flushCounters(input: FlowConsumer)
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
index eff111b8..7fe0c1b7 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt
@@ -135,6 +135,10 @@ public class ForwardingFlowMultiplexer(
clearInputs()
}
+ override fun flushCounters() {}
+
+ override fun flushCounters(input: FlowConsumer) {}
+
private var _lastConverge = Long.MAX_VALUE
override fun onConverge(now: Long, delta: Long) {
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
index 3d26efda..7ee4d326 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -141,6 +141,14 @@ public class MaxMinFlowMultiplexer(
clearInputs()
}
+ override fun flushCounters() {
+ scheduler.updateCounters(engine.clock.millis())
+ }
+
+ override fun flushCounters(input: FlowConsumer) {
+ (input as Input).doUpdateCounters(engine.clock.millis())
+ }
+
/**
* Helper class containing the scheduler state.
*/
@@ -189,7 +197,6 @@ public class MaxMinFlowMultiplexer(
* Flag to indicate that the scheduler is active.
*/
private var _schedulerActive = false
- private var _lastSchedulerCycle = Long.MAX_VALUE
/**
* The last convergence timestamp and the input.
@@ -324,14 +331,9 @@ public class MaxMinFlowMultiplexer(
* Synchronously run the scheduler of the multiplexer.
*/
fun runScheduler(now: Long): Long {
- val lastSchedulerCycle = _lastSchedulerCycle
- _lastSchedulerCycle = now
-
- val delta = max(0, now - lastSchedulerCycle)
-
return try {
_schedulerActive = true
- doRunScheduler(now, delta)
+ doRunScheduler(now)
} finally {
_schedulerActive = false
}
@@ -384,14 +386,14 @@ public class MaxMinFlowMultiplexer(
*
* @return The deadline after which a new scheduling cycle should start.
*/
- private fun doRunScheduler(now: Long, delta: Long): Long {
+ private fun doRunScheduler(now: Long): Long {
val activeInputs = _activeInputs
val activeOutputs = _activeOutputs
var inputArray = _inputArray
var inputSize = _inputArray.size
// Update the counters of the scheduler
- updateCounters(delta)
+ updateCounters(now)
// If there is no work yet, mark the inputs as idle.
if (inputSize == 0) {
@@ -475,14 +477,19 @@ public class MaxMinFlowMultiplexer(
* The previous capacity of the multiplexer.
*/
private var _previousCapacity = 0.0
+ private var _previousUpdate = Long.MIN_VALUE
/**
* Update the counters of the scheduler.
*/
- private fun updateCounters(delta: Long) {
+ fun updateCounters(now: Long) {
val previousCapacity = _previousCapacity
_previousCapacity = capacity
+ val previousUpdate = _previousUpdate
+ _previousUpdate = now
+
+ val delta = (now - previousUpdate).coerceAtLeast(0)
if (delta <= 0) {
return
}
@@ -643,7 +650,7 @@ public class MaxMinFlowMultiplexer(
delta: Long,
rate: Double
) {
- doUpdateCounters(delta)
+ doUpdateCounters(now)
val allowed = min(rate, capacity)
limit = rate
@@ -654,7 +661,7 @@ public class MaxMinFlowMultiplexer(
}
override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {
- doUpdateCounters(delta)
+ doUpdateCounters(now)
limit = 0.0
actualRate = 0.0
@@ -673,9 +680,18 @@ public class MaxMinFlowMultiplexer(
override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate)
/**
+ * The timestamp that the counters where last updated.
+ */
+ private var _lastUpdate = Long.MIN_VALUE
+
+ /**
* Helper method to update the flow counters of the multiplexer.
*/
- private fun doUpdateCounters(delta: Long) {
+ fun doUpdateCounters(now: Long) {
+ val lastUpdate = _lastUpdate
+ _lastUpdate = now
+
+ val delta = (now - lastUpdate).coerceAtLeast(0)
if (delta <= 0L) {
return
}