diff options
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src/main')
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt | 40 |
1 files changed, 29 insertions, 11 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 28743276..eab5b299 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -168,6 +168,11 @@ public class MaxMinFlowMultiplexer( private val _activeInputs = mutableListOf<Input>() /** + * An array containing the active inputs, which is used to reduce the overhead of an [ArrayList]. + */ + private var _inputArray = emptyArray<Input>() + + /** * The active outputs registered with the scheduler. */ private val _activeOutputs = mutableListOf<Output>() @@ -194,6 +199,7 @@ public class MaxMinFlowMultiplexer( */ fun registerInput(input: Input) { _activeInputs.add(input) + _inputArray = _activeInputs.toTypedArray() val hasActivationOutput = activationOutput != null @@ -201,6 +207,7 @@ public class MaxMinFlowMultiplexer( input.shouldConsumerConverge = !hasActivationOutput input.enableTimers = !hasActivationOutput input.capacity = capacity + trigger(_clock.millis()) } @@ -213,6 +220,8 @@ public class MaxMinFlowMultiplexer( _lastConvergeInput = null } + _activeInputs.remove(input) + // Re-run scheduler to distribute new load trigger(now) } @@ -365,12 +374,14 @@ public class MaxMinFlowMultiplexer( private fun doRunScheduler(delta: Long): Long { val activeInputs = _activeInputs val activeOutputs = _activeOutputs + var inputArray = _inputArray + var inputSize = _inputArray.size // Update the counters of the scheduler updateCounters(delta) // If there is no work yet, mark the inputs as idle. - if (activeInputs.isEmpty()) { + if (inputSize == 0) { demand = 0.0 rate = 0.0 return Long.MAX_VALUE @@ -380,35 +391,42 @@ public class MaxMinFlowMultiplexer( var availableCapacity = capacity var deadline = Long.MAX_VALUE var demand = 0.0 + var shouldRebuild = false - // Pull in the work of the outputs - val inputIterator = activeInputs.listIterator() - for (input in inputIterator) { + // Pull in the work of the inputs + for (i in 0 until inputSize) { + val input = inputArray[i] input.pullSync() - // Remove outputs that have finished + // Remove inputs that have finished if (!input.isActive) { input.actualRate = 0.0 - inputIterator.remove() + shouldRebuild = true } else { demand += input.limit deadline = min(deadline, input.deadline) } } + // Slow-path: Rebuild the input array based on the (apparently) updated `activeInputs` + if (shouldRebuild) { + inputArray = activeInputs.toTypedArray() + inputSize = inputArray.size + _inputArray = inputArray + } + val rate = if (demand > capacity) { // If the demand is higher than the capacity, we need use max-min fair sharing to distribute the // constrained capacity across the inputs. // Sort in-place the inputs based on their pushed flow. // Profiling shows that it is faster than maintaining some kind of sorted set. - activeInputs.sort() + inputArray.sort() // Divide the available output capacity fairly over the inputs using max-min fair sharing - val size = activeInputs.size - for (i in activeInputs.indices) { - val input = activeInputs[i] - val availableShare = availableCapacity / (size - i) + for (i in 0 until inputSize) { + val input = inputArray[i] + val availableShare = availableCapacity / (inputSize - i) val grantedRate = min(input.allowedRate, availableShare) availableCapacity -= grantedRate |
