summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt40
1 files changed, 29 insertions, 11 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
index 28743276..eab5b299 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -168,6 +168,11 @@ public class MaxMinFlowMultiplexer(
private val _activeInputs = mutableListOf<Input>()
/**
+ * An array containing the active inputs, which is used to reduce the overhead of an [ArrayList].
+ */
+ private var _inputArray = emptyArray<Input>()
+
+ /**
* The active outputs registered with the scheduler.
*/
private val _activeOutputs = mutableListOf<Output>()
@@ -194,6 +199,7 @@ public class MaxMinFlowMultiplexer(
*/
fun registerInput(input: Input) {
_activeInputs.add(input)
+ _inputArray = _activeInputs.toTypedArray()
val hasActivationOutput = activationOutput != null
@@ -201,6 +207,7 @@ public class MaxMinFlowMultiplexer(
input.shouldConsumerConverge = !hasActivationOutput
input.enableTimers = !hasActivationOutput
input.capacity = capacity
+
trigger(_clock.millis())
}
@@ -213,6 +220,8 @@ public class MaxMinFlowMultiplexer(
_lastConvergeInput = null
}
+ _activeInputs.remove(input)
+
// Re-run scheduler to distribute new load
trigger(now)
}
@@ -365,12 +374,14 @@ public class MaxMinFlowMultiplexer(
private fun doRunScheduler(delta: Long): Long {
val activeInputs = _activeInputs
val activeOutputs = _activeOutputs
+ var inputArray = _inputArray
+ var inputSize = _inputArray.size
// Update the counters of the scheduler
updateCounters(delta)
// If there is no work yet, mark the inputs as idle.
- if (activeInputs.isEmpty()) {
+ if (inputSize == 0) {
demand = 0.0
rate = 0.0
return Long.MAX_VALUE
@@ -380,35 +391,42 @@ public class MaxMinFlowMultiplexer(
var availableCapacity = capacity
var deadline = Long.MAX_VALUE
var demand = 0.0
+ var shouldRebuild = false
- // Pull in the work of the outputs
- val inputIterator = activeInputs.listIterator()
- for (input in inputIterator) {
+ // Pull in the work of the inputs
+ for (i in 0 until inputSize) {
+ val input = inputArray[i]
input.pullSync()
- // Remove outputs that have finished
+ // Remove inputs that have finished
if (!input.isActive) {
input.actualRate = 0.0
- inputIterator.remove()
+ shouldRebuild = true
} else {
demand += input.limit
deadline = min(deadline, input.deadline)
}
}
+ // Slow-path: Rebuild the input array based on the (apparently) updated `activeInputs`
+ if (shouldRebuild) {
+ inputArray = activeInputs.toTypedArray()
+ inputSize = inputArray.size
+ _inputArray = inputArray
+ }
+
val rate = if (demand > capacity) {
// If the demand is higher than the capacity, we need use max-min fair sharing to distribute the
// constrained capacity across the inputs.
// Sort in-place the inputs based on their pushed flow.
// Profiling shows that it is faster than maintaining some kind of sorted set.
- activeInputs.sort()
+ inputArray.sort()
// Divide the available output capacity fairly over the inputs using max-min fair sharing
- val size = activeInputs.size
- for (i in activeInputs.indices) {
- val input = activeInputs[i]
- val availableShare = availableCapacity / (size - i)
+ for (i in 0 until inputSize) {
+ val input = inputArray[i]
+ val availableShare = availableCapacity / (inputSize - i)
val grantedRate = min(input.allowedRate, availableShare)
availableCapacity -= grantedRate