summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt60
1 files changed, 39 insertions, 21 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
index 97059e93..9131eb54 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -372,6 +372,8 @@ public class MaxMinFlowMultiplexer(
val capacity = capacity
var availableCapacity = capacity
+ var deadline = Long.MAX_VALUE
+ var demand = 0.0
// Pull in the work of the outputs
val inputIterator = activeInputs.listIterator()
@@ -382,32 +384,36 @@ public class MaxMinFlowMultiplexer(
if (!input.isActive) {
input.actualRate = 0.0
inputIterator.remove()
+ } else {
+ demand += input.limit
+ deadline = min(deadline, input.deadline)
}
}
- var demand = 0.0
- var deadline = Long.MAX_VALUE
+ val rate = if (demand > capacity) {
+ // If the demand is higher than the capacity, we need use max-min fair sharing to distribute the
+ // constrained capacity across the inputs.
- // Sort in-place the inputs based on their pushed flow.
- // Profiling shows that it is faster than maintaining some kind of sorted set.
- activeInputs.sort()
+ // Sort in-place the inputs based on their pushed flow.
+ // Profiling shows that it is faster than maintaining some kind of sorted set.
+ activeInputs.sort()
- // Divide the available output capacity fairly over the inputs using max-min fair sharing
- val size = activeInputs.size
- for (i in activeInputs.indices) {
- val input = activeInputs[i]
- val availableShare = availableCapacity / (size - i)
- val grantedRate = min(input.allowedRate, availableShare)
+ // Divide the available output capacity fairly over the inputs using max-min fair sharing
+ val size = activeInputs.size
+ for (i in activeInputs.indices) {
+ val input = activeInputs[i]
+ val availableShare = availableCapacity / (size - i)
+ val grantedRate = min(input.allowedRate, availableShare)
- demand += input.limit
- deadline = min(deadline, input.deadline)
- availableCapacity -= grantedRate
+ availableCapacity -= grantedRate
+ input.actualRate = grantedRate
+ }
- input.actualRate = grantedRate
+ capacity - availableCapacity
+ } else {
+ demand
}
- val rate = capacity - availableCapacity
-
this.demand = demand
this.rate = rate
@@ -468,16 +474,25 @@ public class MaxMinFlowMultiplexer(
@JvmField var actualRate: Double = 0.0
/**
+ * The processing rate that is allowed by the model constraints.
+ */
+ @JvmField var allowedRate: Double = 0.0
+
+ /**
* The deadline of the input.
*/
val deadline: Long
get() = ctx?.deadline ?: Long.MAX_VALUE
/**
- * The processing rate that is allowed by the model constraints.
+ * The capacity of the input.
*/
- val allowedRate: Double
- get() = min(capacity, limit)
+ override var capacity: Double
+ get() = super.capacity
+ set(value) {
+ allowedRate = min(limit, value)
+ super.capacity = value
+ }
/**
* A flag to enable timers for the input.
@@ -530,8 +545,10 @@ public class MaxMinFlowMultiplexer(
) {
doUpdateCounters(delta)
- actualRate = 0.0
+ val allowed = min(rate, capacity)
limit = rate
+ actualRate = allowed
+ allowedRate = allowed
scheduler.trigger(now)
}
@@ -541,6 +558,7 @@ public class MaxMinFlowMultiplexer(
limit = 0.0
actualRate = 0.0
+ allowedRate = 0.0
scheduler.deregisterInput(this, now)