diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-05 11:05:23 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-05 16:56:44 +0200 |
| commit | 0ec821157767a630ff82f980f4990ec9d1b75573 (patch) | |
| tree | 576813b37f532726ee9116161952047e159b0f3e /opendc-simulator/opendc-simulator-flow/src | |
| parent | 7c260ab0b083488b8855f61648548a40401cf62e (diff) | |
refactor(simulator): Extract scheduler for max min multiplexer
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src')
3 files changed, 383 insertions, 247 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt index 939c5c98..97d56fff 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt @@ -35,9 +35,9 @@ internal const val ConnState = 0b11 // Mask for accessing the state of the flow */ internal const val ConnPulled = 1 shl 2 // The source should be pulled internal const val ConnPushed = 1 shl 3 // The source has pushed a value -internal const val ConnUpdateActive = 1 shl 4 // An update for the connection is active -internal const val ConnUpdatePending = 1 shl 5 // An (immediate) update of the connection is pending -internal const val ConnUpdateSkipped = 1 shl 6 // An update of the connection was not necessary +internal const val ConnClose = 1 shl 4 // The connection should be closed +internal const val ConnUpdateActive = 1 shl 5 // An update for the connection is active +internal const val ConnUpdatePending = 1 shl 6 // An (immediate) update of the connection is pending internal const val ConnConvergePending = 1 shl 7 // Indication that a convergence is already pending internal const val ConnConvergeSource = 1 shl 8 // Enable convergence of the source internal const val ConnConvergeConsumer = 1 shl 9 // Enable convergence of the consumer diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index c7a8c3de..f15d7fb0 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -150,23 +150,17 @@ internal class FlowConsumerContextImpl( } override fun close() { - var flags = _flags + val flags = _flags if (flags and ConnState == ConnClosed) { return } - engine.batch { - // Mark the connection as closed and pulled (in order to converge) - flags = (flags and ConnState.inv()) or ConnClosed or ConnPulled - _flags = flags - - if (flags and ConnUpdateActive == 0) { - val now = _clock.millis() - doStopSource(now) - - // FIX: Make sure the context converges - scheduleImmediate(now, flags) - } + // Toggle the close bit. In case no update is active, schedule a new update. + if (flags and ConnUpdateActive == 0) { + val now = _clock.millis() + scheduleImmediate(now, flags or ConnClose) + } else { + _flags = flags or ConnClose } } @@ -232,7 +226,7 @@ internal class FlowConsumerContextImpl( val deadline = _deadline val reachedDeadline = deadline == now - var newDeadline = deadline + var newDeadline: Long var hasUpdated = false try { @@ -259,9 +253,13 @@ internal class FlowConsumerContextImpl( deadline } + // Make the new deadline available for the consumer if it has changed + if (newDeadline != deadline) { + _deadline = newDeadline + } + // Push to the consumer if the rate of the source has changed (after a call to `push`) - val newState = flags and ConnState - if (newState == ConnActive && flags and ConnPushed != 0) { + if (flags and ConnPushed != 0) { val lastPush = _lastPush val delta = max(0, now - lastPush) @@ -274,17 +272,33 @@ internal class FlowConsumerContextImpl( // IMPORTANT: Re-fetch the flags after the callback might have changed those flags = _flags - } else if (newState == ConnClosed) { + } + + // Check whether the source or consumer have tried to close the connection + if (flags and ConnClose != 0) { hasUpdated = true // The source has called [FlowConnection.close], so clean up the connection doStopSource(now) + + // IMPORTANT: Re-fetch the flags after the callback might have changed those + // We now also mark the connection as closed + flags = (_flags and ConnState.inv()) or ConnClosed + + _demand = 0.0 + newDeadline = Long.MAX_VALUE } } catch (cause: Throwable) { + hasUpdated = true + + // Clean up the connection + doFailSource(now, cause) + // Mark the connection as closed flags = (flags and ConnState.inv()) or ConnClosed - doFailSource(now, cause) + _demand = 0.0 + newDeadline = Long.MAX_VALUE } // Check whether the connection needs to be added to the visited queue. This is the case when: @@ -316,9 +330,6 @@ internal class FlowConsumerContextImpl( _timer } - // Set the new deadline and schedule a delayed update for that deadline - _deadline = newDeadline - // Check whether we need to schedule a new timer for this connection. That is the case when: // (1) The deadline is valid (not the maximum value) // (2) The connection is active @@ -355,8 +366,8 @@ internal class FlowConsumerContextImpl( // The connection is converging now, so unset the convergence pending flag _flags = flags and ConnConvergePending.inv() - // Call the source converge callback if it has enabled convergence and the connection is active - if (flags and ConnState == ConnActive && flags and ConnConvergeSource != 0) { + // Call the source converge callback if it has enabled convergence + if (flags and ConnConvergeSource != 0) { val delta = max(0, now - _lastSourceConvergence) _lastSourceConvergence = now @@ -371,7 +382,13 @@ internal class FlowConsumerContextImpl( logic.onConverge(this, now, delta) } } catch (cause: Throwable) { + // Invoke the finish callbacks doFailSource(now, cause) + + // Mark the connection as closed + _flags = (_flags and ConnState.inv()) or ConnClosed + _demand = 0.0 + _deadline = Long.MAX_VALUE } } @@ -386,10 +403,6 @@ internal class FlowConsumerContextImpl( doFinishConsumer(now, null) } catch (cause: Throwable) { doFinishConsumer(now, cause) - return - } finally { - _deadline = Long.MAX_VALUE - _demand = 0.0 } } @@ -402,9 +415,6 @@ internal class FlowConsumerContextImpl( } catch (e: Throwable) { e.addSuppressed(cause) doFinishConsumer(now, e) - } finally { - _deadline = Long.MAX_VALUE - _demand = 0.0 } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 22f6516d..c6aa94e2 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -38,7 +38,7 @@ import kotlin.math.min */ public class MaxMinFlowMultiplexer( private val engine: FlowEngine, - private val parent: FlowConvergenceListener? = null, + parent: FlowConvergenceListener? = null, private val interferenceDomain: InterferenceDomain? = null ) : FlowMultiplexer { /** @@ -47,7 +47,6 @@ public class MaxMinFlowMultiplexer( override val inputs: Set<FlowConsumer> get() = _inputs private val _inputs = mutableSetOf<Input>() - private val _activeInputs = mutableListOf<Input>() /** * The outputs of the multiplexer. @@ -55,55 +54,38 @@ public class MaxMinFlowMultiplexer( override val outputs: Set<FlowSource> get() = _outputs private val _outputs = mutableSetOf<Output>() - private val _activeOutputs = mutableListOf<Output>() /** * The flow counters of this multiplexer. */ public override val counters: FlowCounters - get() = _counters - private val _counters = FlowCountersImpl() + get() = scheduler.counters /** * The actual processing rate of the multiplexer. */ public override val rate: Double - get() = _rate - private var _rate = 0.0 + get() = scheduler.rate /** * The demanded processing rate of the input. */ public override val demand: Double - get() = _demand - private var _demand = 0.0 + get() = scheduler.demand /** * The capacity of the outputs. */ public override val capacity: Double - get() = _capacity - private var _capacity = 0.0 + get() = scheduler.capacity /** - * Flag to indicate that the scheduler is active. + * The [Scheduler] instance of this multiplexer. */ - private var _schedulerActive = false - private var _lastSchedulerCycle = Long.MAX_VALUE - - /** - * The last convergence timestamp and the input. - */ - private var _lastConverge: Long = Long.MIN_VALUE - private var _lastConvergeInput: Input? = null - - /** - * An [Output] that is used to activate the scheduler. - */ - private var _activationOutput: Output? = null + private val scheduler = Scheduler(engine, parent) override fun newInput(key: InterferenceKey?): FlowConsumer { - val provider = Input(_capacity, key) + val provider = Input(engine, scheduler, interferenceDomain, key) _inputs.add(provider) return provider } @@ -117,7 +99,7 @@ public class MaxMinFlowMultiplexer( } override fun newOutput(): FlowSource { - val output = Output() + val output = Output(scheduler) _outputs.add(output) return output } @@ -151,185 +133,330 @@ public class MaxMinFlowMultiplexer( } /** - * Trigger the scheduler of the multiplexer. - * - * @param now The current virtual timestamp of the simulation. + * Helper class containing the scheduler state. */ - private fun triggerScheduler(now: Long) { - if (_schedulerActive) { - // No need to trigger the scheduler in case it is already active - return - } + private class Scheduler(private val engine: FlowEngine, private val parent: FlowConvergenceListener?) { + /** + * The flow counters of this scheduler. + */ + @JvmField val counters = FlowCountersImpl() - val activationOutput = _activationOutput + /** + * The flow rate of the multiplexer. + */ + @JvmField var rate = 0.0 - // We can run the scheduler in two ways: - // (1) We can pull one of the multiplexer's outputs. This allows us to cascade multiple pushes by the input - // into a single scheduling cycle, but is slower in case of a few changes at the same timestamp. - // (2) We run the scheduler directly from this method call. This is the fastest approach when there are only - // a few inputs and little changes at the same timestamp. - // We always pick for option (1) unless there are no outputs available. - if (activationOutput != null) { - activationOutput.pull() - return - } else { - runScheduler(now) - } - } + /** + * The demand for the multiplexer. + */ + @JvmField var demand = 0.0 - /** - * Synchronously run the scheduler of the multiplexer. - */ - private fun runScheduler(now: Long): Long { - val lastSchedulerCycle = _lastSchedulerCycle - _lastSchedulerCycle = now + /** + * The capacity of the multiplexer. + */ + @JvmField var capacity = 0.0 + + /** + * An [Output] that is used to activate the scheduler. + */ + @JvmField var activationOutput: Output? = null - val delta = max(0, now - lastSchedulerCycle) + /** + * The active inputs registered with the scheduler. + */ + private val _activeInputs = mutableListOf<Input>() - return try { - _schedulerActive = true - doSchedule(delta) - } finally { - _schedulerActive = false + /** + * The active outputs registered with the scheduler. + */ + private val _activeOutputs = mutableListOf<Output>() + + /** + * Flag to indicate that the scheduler is active. + */ + private var _schedulerActive = false + private var _lastSchedulerCycle = Long.MAX_VALUE + + /** + * The last convergence timestamp and the input. + */ + private var _lastConverge: Long = Long.MIN_VALUE + private var _lastConvergeInput: Input? = null + + /** + * Register the specified [input] to this scheduler. + */ + fun registerInput(input: Input) { + _activeInputs.add(input) + + val hasActivationOutput = activationOutput != null + + // Disable timers and convergence of the source if one of the output manages it + input.shouldConsumerConverge = !hasActivationOutput + input.enableTimers = !hasActivationOutput + input.capacity = capacity + trigger(engine.clock.millis()) } - } - /** - * Schedule the inputs over the outputs. - * - * @return The deadline after which a new scheduling cycle should start. - */ - private fun doSchedule(delta: Long): Long { - val activeInputs = _activeInputs - val activeOutputs = _activeOutputs - - // Update the counters of the scheduler - updateCounters(delta) - - // If there is no work yet, mark the inputs as idle. - if (activeInputs.isEmpty()) { - _demand = 0.0 - _rate = 0.0 - return Long.MAX_VALUE + /** + * De-register the specified [input] from this scheduler. + */ + fun deregisterInput(input: Input, now: Long) { + // Assign a new input responsible for handling the convergence events + if (_lastConvergeInput == input) { + _lastConvergeInput = null + } + + // Re-run scheduler to distribute new load + trigger(now) } - val capacity = _capacity - var availableCapacity = capacity + /** + * This method is invoked when one of the inputs converges. + */ + fun convergeInput(input: Input, now: Long) { - // Pull in the work of the outputs - val inputIterator = activeInputs.listIterator() - for (input in inputIterator) { - input.pullSync() + val lastConverge = _lastConverge + val lastConvergeInput = _lastConvergeInput + val parent = parent + + if (parent != null && (now > lastConverge || lastConvergeInput == null || lastConvergeInput == input)) { + _lastConverge = now + _lastConvergeInput = input - // Remove outputs that have finished - if (!input.isActive) { - input.actualRate = 0.0 - inputIterator.remove() + parent.onConverge(now, max(0, now - lastConverge)) } } - var demand = 0.0 - var deadline = Long.MAX_VALUE + /** + * Register the specified [output] to this scheduler. + */ + fun registerOutput(output: Output) { + _activeOutputs.add(output) - // Sort in-place the inputs based on their pushed flow. - // Profiling shows that it is faster than maintaining some kind of sorted set. - activeInputs.sort() + updateCapacity() + updateActivationOutput() + } - // Divide the available output capacity fairly over the inputs using max-min fair sharing - var remaining = activeInputs.size - for (i in activeInputs.indices) { - val input = activeInputs[i] - val availableShare = availableCapacity / remaining-- - val grantedRate = min(input.allowedRate, availableShare) + /** + * De-register the specified [output] from this scheduler. + */ + fun deregisterOutput(output: Output, now: Long) { + _activeOutputs.remove(output) + updateCapacity() - demand += input.limit - deadline = min(deadline, input.deadline) - availableCapacity -= grantedRate + trigger(now) + } + + /** + * This method is invoked when one of the outputs converges. + */ + fun convergeOutput(output: Output, now: Long) { + val lastConverge = _lastConverge + val parent = parent + + if (parent != null) { + _lastConverge = now + + parent.onConverge(now, max(0, now - lastConverge)) + } - input.actualRate = grantedRate + if (!output.isActive) { + output.isActivationOutput = false + updateActivationOutput() + } } - val rate = capacity - availableCapacity + /** + * Trigger the scheduler of the multiplexer. + * + * @param now The current virtual timestamp of the simulation. + */ + fun trigger(now: Long) { + if (_schedulerActive) { + // No need to trigger the scheduler in case it is already active + return + } - _demand = demand - _rate = rate + val activationOutput = activationOutput - // Sort all consumers by their capacity - activeOutputs.sort() + // We can run the scheduler in two ways: + // (1) We can pull one of the multiplexer's outputs. This allows us to cascade multiple pushes by the input + // into a single scheduling cycle, but is slower in case of a few changes at the same timestamp. + // (2) We run the scheduler directly from this method call. This is the fastest approach when there are only + // a few inputs and little changes at the same timestamp. + // We always pick for option (1) unless there are no outputs available. + if (activationOutput != null) { + activationOutput.pull() + return + } else { + runScheduler(now) + } + } + + /** + * Synchronously run the scheduler of the multiplexer. + */ + fun runScheduler(now: Long): Long { + val lastSchedulerCycle = _lastSchedulerCycle + _lastSchedulerCycle = now - // Divide the requests over the available capacity of the input resources fairly - for (i in activeOutputs.indices) { - val output = activeOutputs[i] - val inputCapacity = output.capacity - val fraction = inputCapacity / capacity - val grantedSpeed = rate * fraction + val delta = max(0, now - lastSchedulerCycle) - output.push(grantedSpeed) + return try { + _schedulerActive = true + doRunScheduler(delta) + } finally { + _schedulerActive = false + } } - return deadline - } + /** + * Recompute the capacity of the multiplexer. + */ + fun updateCapacity() { + val newCapacity = _activeOutputs.sumOf(Output::capacity) - /** - * Recompute the capacity of the multiplexer. - */ - private fun updateCapacity() { - val newCapacity = _activeOutputs.sumOf(Output::capacity) + // No-op if the capacity is unchanged + if (capacity == newCapacity) { + return + } - // No-op if the capacity is unchanged - if (_capacity == newCapacity) { - return + capacity = newCapacity + + for (input in _activeInputs) { + input.capacity = newCapacity + } } - _capacity = newCapacity + /** + * Updates the output that is used for scheduler activation. + */ + private fun updateActivationOutput() { + val output = _activeOutputs.firstOrNull() + activationOutput = output - for (input in _inputs) { - input.capacity = newCapacity + if (output != null) { + output.isActivationOutput = true + } + + val hasActivationOutput = output != null + + for (input in _activeInputs) { + input.shouldConsumerConverge = !hasActivationOutput + input.enableTimers = !hasActivationOutput + } } - } - /** - * The previous capacity of the multiplexer. - */ - private var _previousCapacity = 0.0 + /** + * Schedule the inputs over the outputs. + * + * @return The deadline after which a new scheduling cycle should start. + */ + private fun doRunScheduler(delta: Long): Long { + val activeInputs = _activeInputs + val activeOutputs = _activeOutputs + + // Update the counters of the scheduler + updateCounters(delta) + + // If there is no work yet, mark the inputs as idle. + if (activeInputs.isEmpty()) { + demand = 0.0 + rate = 0.0 + return Long.MAX_VALUE + } - /** - * Update the counters of the scheduler. - */ - private fun updateCounters(delta: Long) { - val previousCapacity = _previousCapacity - _previousCapacity = _capacity + val capacity = capacity + var availableCapacity = capacity + + // Pull in the work of the outputs + val inputIterator = activeInputs.listIterator() + for (input in inputIterator) { + input.pullSync() + + // Remove outputs that have finished + if (!input.isActive) { + input.actualRate = 0.0 + inputIterator.remove() + } + } - if (delta <= 0) { - return + var demand = 0.0 + var deadline = Long.MAX_VALUE + + // Sort in-place the inputs based on their pushed flow. + // Profiling shows that it is faster than maintaining some kind of sorted set. + activeInputs.sort() + + // Divide the available output capacity fairly over the inputs using max-min fair sharing + val size = activeInputs.size + for (i in activeInputs.indices) { + val input = activeInputs[i] + val availableShare = availableCapacity / (size - i) + val grantedRate = min(input.allowedRate, availableShare) + + demand += input.limit + deadline = min(deadline, input.deadline) + availableCapacity -= grantedRate + + input.actualRate = grantedRate + } + + val rate = capacity - availableCapacity + + this.demand = demand + this.rate = rate + + // Sort all consumers by their capacity + activeOutputs.sort() + + // Divide the requests over the available capacity of the input resources fairly + for (i in activeOutputs.indices) { + val output = activeOutputs[i] + val inputCapacity = output.capacity + val fraction = inputCapacity / capacity + val grantedSpeed = rate * fraction + + output.push(grantedSpeed) + } + + return deadline } - val deltaS = delta / 1000.0 + /** + * The previous capacity of the multiplexer. + */ + private var _previousCapacity = 0.0 - _counters.demand += _demand * deltaS - _counters.actual += _rate * deltaS - _counters.remaining += (previousCapacity - _rate) * deltaS - } + /** + * Update the counters of the scheduler. + */ + private fun updateCounters(delta: Long) { + val previousCapacity = _previousCapacity + _previousCapacity = capacity - /** - * Updates the output that is used for scheduler activation. - */ - private fun updateActivationOutput() { - val output = _activeOutputs.firstOrNull() - _activationOutput = output + if (delta <= 0) { + return + } + + val deltaS = delta / 1000.0 - for (input in _activeInputs) { - input.enableTimers = output == null + counters.demand += demand * deltaS + counters.actual += rate * deltaS + counters.remaining += (previousCapacity - rate) * deltaS } } /** * An internal [FlowConsumer] implementation for multiplexer inputs. */ - private inner class Input(capacity: Double, val key: InterferenceKey?) : - AbstractFlowConsumer(engine, capacity), - FlowConsumerLogic, - Comparable<Input> { + private class Input( + engine: FlowEngine, + private val scheduler: Scheduler, + private val interferenceDomain: InterferenceDomain?, + @JvmField val key: InterferenceKey? + ) : AbstractFlowConsumer(engine, scheduler.capacity), FlowConsumerLogic, Comparable<Input> { /** * The requested limit. */ @@ -341,18 +468,18 @@ public class MaxMinFlowMultiplexer( @JvmField var actualRate: Double = 0.0 /** - * The processing rate that is allowed by the model constraints. - */ - val allowedRate: Double - get() = min(capacity, limit) - - /** * The deadline of the input. */ val deadline: Long get() = ctx?.deadline ?: Long.MAX_VALUE /** + * The processing rate that is allowed by the model constraints. + */ + val allowedRate: Double + get() = min(capacity, limit) + + /** * A flag to enable timers for the input. */ var enableTimers: Boolean = true @@ -362,14 +489,18 @@ public class MaxMinFlowMultiplexer( } /** - * A flag to indicate that the input is closed. + * A flag to control whether the input should converge. */ - private var _isClosed: Boolean = false + var shouldConsumerConverge: Boolean = true + set(value) { + field = value + ctx?.shouldConsumerConverge = value + } /** - * The interference domain this input belongs to. + * A flag to indicate that the input is closed. */ - private val interferenceDomain = this@MaxMinFlowMultiplexer.interferenceDomain + private var _isClosed: Boolean = false /** * Close the input. @@ -386,17 +517,8 @@ public class MaxMinFlowMultiplexer( override fun start(ctx: FlowConsumerContext) { check(!_isClosed) { "Cannot re-use closed input" } - - _activeInputs += this - - if (parent != null) { - ctx.shouldConsumerConverge = true - } - enableTimers = _activationOutput == null // Disable timers of the source if one of the output manages it - + scheduler.registerInput(this) super.start(ctx) - - triggerScheduler(engine.clock.millis()) } /* FlowConsumerLogic */ @@ -411,19 +533,7 @@ public class MaxMinFlowMultiplexer( actualRate = 0.0 limit = rate - triggerScheduler(now) - } - - override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { - val lastConverge = _lastConverge - val parent = parent - - if (parent != null && (lastConverge < now || _lastConvergeInput == null)) { - _lastConverge = now - _lastConvergeInput = this - - parent.onConverge(now, max(0, now - lastConverge)) - } + scheduler.trigger(now) } override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { @@ -432,18 +542,16 @@ public class MaxMinFlowMultiplexer( limit = 0.0 actualRate = 0.0 - // Assign a new input responsible for handling the convergence events - if (_lastConvergeInput == this) { - _lastConvergeInput = null - } - - // Re-run scheduler to distribute new load - triggerScheduler(now) + scheduler.deregisterInput(this, now) // BUG: Cancel the connection so that `ctx` is set to `null` cancel() } + override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { + scheduler.convergeInput(this, now) + } + /* Comparable */ override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate) @@ -464,7 +572,7 @@ public class MaxMinFlowMultiplexer( // Compute the performance penalty due to flow interference val perfScore = if (interferenceDomain != null) { - val load = _rate / _capacity + val load = scheduler.rate / scheduler.capacity interferenceDomain.apply(key, load) } else { 1.0 @@ -477,14 +585,14 @@ public class MaxMinFlowMultiplexer( updateCounters(demand, actual, remaining) - _counters.interference += actual * max(0.0, 1 - perfScore) + scheduler.counters.interference += actual * max(0.0, 1 - perfScore) } } /** * An internal [FlowSource] implementation for multiplexer outputs. */ - private inner class Output : FlowSource, Comparable<Output> { + private class Output(private val scheduler: Scheduler) : FlowSource, Comparable<Output> { /** * The active [FlowConnection] of this source. */ @@ -496,6 +604,22 @@ public class MaxMinFlowMultiplexer( @JvmField var capacity: Double = 0.0 /** + * A flag to indicate that this output is the activation output. + */ + var isActivationOutput: Boolean + get() = _isActivationOutput + set(value) { + _isActivationOutput = value + _conn?.shouldSourceConverge = value + } + private var _isActivationOutput: Boolean = false + + /** + * A flag to indicate that the output is active. + */ + @JvmField var isActive = false + + /** * Push the specified rate to the consumer. */ fun push(rate: Double) { @@ -520,33 +644,29 @@ public class MaxMinFlowMultiplexer( assert(_conn == null) { "Source running concurrently" } _conn = conn capacity = conn.capacity - _activeOutputs.add(this) + isActive = true - updateCapacity() - updateActivationOutput() + scheduler.registerOutput(this) } override fun onStop(conn: FlowConnection, now: Long, delta: Long) { _conn = null capacity = 0.0 - _activeOutputs.remove(this) + isActive = false - updateCapacity() - updateActivationOutput() - - triggerScheduler(now) + scheduler.deregisterOutput(this, now) } override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { val capacity = capacity if (capacity != conn.capacity) { this.capacity = capacity - updateCapacity() + scheduler.updateCapacity() } - return if (_activationOutput == this) { + return if (_isActivationOutput) { // If this output is the activation output, synchronously run the scheduler and return the new deadline - val deadline = runScheduler(now) + val deadline = scheduler.runScheduler(now) if (deadline == Long.MAX_VALUE) deadline else @@ -554,11 +674,17 @@ public class MaxMinFlowMultiplexer( } else { // Output is not the activation output, so trigger activation output and do not install timer for this // output (by returning `Long.MAX_VALUE`) - triggerScheduler(now) + scheduler.trigger(now) Long.MAX_VALUE } } + override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + if (_isActivationOutput) { + scheduler.convergeOutput(this, now) + } + } + override fun compareTo(other: Output): Int = capacity.compareTo(other.capacity) } } |
