summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-05 11:05:23 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-05 16:56:44 +0200
commit0ec821157767a630ff82f980f4990ec9d1b75573 (patch)
tree576813b37f532726ee9116161952047e159b0f3e /opendc-simulator/opendc-simulator-flow/src
parent7c260ab0b083488b8855f61648548a40401cf62e (diff)
refactor(simulator): Extract scheduler for max min multiplexer
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt6
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt70
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt554
3 files changed, 383 insertions, 247 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt
index 939c5c98..97d56fff 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt
@@ -35,9 +35,9 @@ internal const val ConnState = 0b11 // Mask for accessing the state of the flow
*/
internal const val ConnPulled = 1 shl 2 // The source should be pulled
internal const val ConnPushed = 1 shl 3 // The source has pushed a value
-internal const val ConnUpdateActive = 1 shl 4 // An update for the connection is active
-internal const val ConnUpdatePending = 1 shl 5 // An (immediate) update of the connection is pending
-internal const val ConnUpdateSkipped = 1 shl 6 // An update of the connection was not necessary
+internal const val ConnClose = 1 shl 4 // The connection should be closed
+internal const val ConnUpdateActive = 1 shl 5 // An update for the connection is active
+internal const val ConnUpdatePending = 1 shl 6 // An (immediate) update of the connection is pending
internal const val ConnConvergePending = 1 shl 7 // Indication that a convergence is already pending
internal const val ConnConvergeSource = 1 shl 8 // Enable convergence of the source
internal const val ConnConvergeConsumer = 1 shl 9 // Enable convergence of the consumer
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
index c7a8c3de..f15d7fb0 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
@@ -150,23 +150,17 @@ internal class FlowConsumerContextImpl(
}
override fun close() {
- var flags = _flags
+ val flags = _flags
if (flags and ConnState == ConnClosed) {
return
}
- engine.batch {
- // Mark the connection as closed and pulled (in order to converge)
- flags = (flags and ConnState.inv()) or ConnClosed or ConnPulled
- _flags = flags
-
- if (flags and ConnUpdateActive == 0) {
- val now = _clock.millis()
- doStopSource(now)
-
- // FIX: Make sure the context converges
- scheduleImmediate(now, flags)
- }
+ // Toggle the close bit. In case no update is active, schedule a new update.
+ if (flags and ConnUpdateActive == 0) {
+ val now = _clock.millis()
+ scheduleImmediate(now, flags or ConnClose)
+ } else {
+ _flags = flags or ConnClose
}
}
@@ -232,7 +226,7 @@ internal class FlowConsumerContextImpl(
val deadline = _deadline
val reachedDeadline = deadline == now
- var newDeadline = deadline
+ var newDeadline: Long
var hasUpdated = false
try {
@@ -259,9 +253,13 @@ internal class FlowConsumerContextImpl(
deadline
}
+ // Make the new deadline available for the consumer if it has changed
+ if (newDeadline != deadline) {
+ _deadline = newDeadline
+ }
+
// Push to the consumer if the rate of the source has changed (after a call to `push`)
- val newState = flags and ConnState
- if (newState == ConnActive && flags and ConnPushed != 0) {
+ if (flags and ConnPushed != 0) {
val lastPush = _lastPush
val delta = max(0, now - lastPush)
@@ -274,17 +272,33 @@ internal class FlowConsumerContextImpl(
// IMPORTANT: Re-fetch the flags after the callback might have changed those
flags = _flags
- } else if (newState == ConnClosed) {
+ }
+
+ // Check whether the source or consumer have tried to close the connection
+ if (flags and ConnClose != 0) {
hasUpdated = true
// The source has called [FlowConnection.close], so clean up the connection
doStopSource(now)
+
+ // IMPORTANT: Re-fetch the flags after the callback might have changed those
+ // We now also mark the connection as closed
+ flags = (_flags and ConnState.inv()) or ConnClosed
+
+ _demand = 0.0
+ newDeadline = Long.MAX_VALUE
}
} catch (cause: Throwable) {
+ hasUpdated = true
+
+ // Clean up the connection
+ doFailSource(now, cause)
+
// Mark the connection as closed
flags = (flags and ConnState.inv()) or ConnClosed
- doFailSource(now, cause)
+ _demand = 0.0
+ newDeadline = Long.MAX_VALUE
}
// Check whether the connection needs to be added to the visited queue. This is the case when:
@@ -316,9 +330,6 @@ internal class FlowConsumerContextImpl(
_timer
}
- // Set the new deadline and schedule a delayed update for that deadline
- _deadline = newDeadline
-
// Check whether we need to schedule a new timer for this connection. That is the case when:
// (1) The deadline is valid (not the maximum value)
// (2) The connection is active
@@ -355,8 +366,8 @@ internal class FlowConsumerContextImpl(
// The connection is converging now, so unset the convergence pending flag
_flags = flags and ConnConvergePending.inv()
- // Call the source converge callback if it has enabled convergence and the connection is active
- if (flags and ConnState == ConnActive && flags and ConnConvergeSource != 0) {
+ // Call the source converge callback if it has enabled convergence
+ if (flags and ConnConvergeSource != 0) {
val delta = max(0, now - _lastSourceConvergence)
_lastSourceConvergence = now
@@ -371,7 +382,13 @@ internal class FlowConsumerContextImpl(
logic.onConverge(this, now, delta)
}
} catch (cause: Throwable) {
+ // Invoke the finish callbacks
doFailSource(now, cause)
+
+ // Mark the connection as closed
+ _flags = (_flags and ConnState.inv()) or ConnClosed
+ _demand = 0.0
+ _deadline = Long.MAX_VALUE
}
}
@@ -386,10 +403,6 @@ internal class FlowConsumerContextImpl(
doFinishConsumer(now, null)
} catch (cause: Throwable) {
doFinishConsumer(now, cause)
- return
- } finally {
- _deadline = Long.MAX_VALUE
- _demand = 0.0
}
}
@@ -402,9 +415,6 @@ internal class FlowConsumerContextImpl(
} catch (e: Throwable) {
e.addSuppressed(cause)
doFinishConsumer(now, e)
- } finally {
- _deadline = Long.MAX_VALUE
- _demand = 0.0
}
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
index 22f6516d..c6aa94e2 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -38,7 +38,7 @@ import kotlin.math.min
*/
public class MaxMinFlowMultiplexer(
private val engine: FlowEngine,
- private val parent: FlowConvergenceListener? = null,
+ parent: FlowConvergenceListener? = null,
private val interferenceDomain: InterferenceDomain? = null
) : FlowMultiplexer {
/**
@@ -47,7 +47,6 @@ public class MaxMinFlowMultiplexer(
override val inputs: Set<FlowConsumer>
get() = _inputs
private val _inputs = mutableSetOf<Input>()
- private val _activeInputs = mutableListOf<Input>()
/**
* The outputs of the multiplexer.
@@ -55,55 +54,38 @@ public class MaxMinFlowMultiplexer(
override val outputs: Set<FlowSource>
get() = _outputs
private val _outputs = mutableSetOf<Output>()
- private val _activeOutputs = mutableListOf<Output>()
/**
* The flow counters of this multiplexer.
*/
public override val counters: FlowCounters
- get() = _counters
- private val _counters = FlowCountersImpl()
+ get() = scheduler.counters
/**
* The actual processing rate of the multiplexer.
*/
public override val rate: Double
- get() = _rate
- private var _rate = 0.0
+ get() = scheduler.rate
/**
* The demanded processing rate of the input.
*/
public override val demand: Double
- get() = _demand
- private var _demand = 0.0
+ get() = scheduler.demand
/**
* The capacity of the outputs.
*/
public override val capacity: Double
- get() = _capacity
- private var _capacity = 0.0
+ get() = scheduler.capacity
/**
- * Flag to indicate that the scheduler is active.
+ * The [Scheduler] instance of this multiplexer.
*/
- private var _schedulerActive = false
- private var _lastSchedulerCycle = Long.MAX_VALUE
-
- /**
- * The last convergence timestamp and the input.
- */
- private var _lastConverge: Long = Long.MIN_VALUE
- private var _lastConvergeInput: Input? = null
-
- /**
- * An [Output] that is used to activate the scheduler.
- */
- private var _activationOutput: Output? = null
+ private val scheduler = Scheduler(engine, parent)
override fun newInput(key: InterferenceKey?): FlowConsumer {
- val provider = Input(_capacity, key)
+ val provider = Input(engine, scheduler, interferenceDomain, key)
_inputs.add(provider)
return provider
}
@@ -117,7 +99,7 @@ public class MaxMinFlowMultiplexer(
}
override fun newOutput(): FlowSource {
- val output = Output()
+ val output = Output(scheduler)
_outputs.add(output)
return output
}
@@ -151,185 +133,330 @@ public class MaxMinFlowMultiplexer(
}
/**
- * Trigger the scheduler of the multiplexer.
- *
- * @param now The current virtual timestamp of the simulation.
+ * Helper class containing the scheduler state.
*/
- private fun triggerScheduler(now: Long) {
- if (_schedulerActive) {
- // No need to trigger the scheduler in case it is already active
- return
- }
+ private class Scheduler(private val engine: FlowEngine, private val parent: FlowConvergenceListener?) {
+ /**
+ * The flow counters of this scheduler.
+ */
+ @JvmField val counters = FlowCountersImpl()
- val activationOutput = _activationOutput
+ /**
+ * The flow rate of the multiplexer.
+ */
+ @JvmField var rate = 0.0
- // We can run the scheduler in two ways:
- // (1) We can pull one of the multiplexer's outputs. This allows us to cascade multiple pushes by the input
- // into a single scheduling cycle, but is slower in case of a few changes at the same timestamp.
- // (2) We run the scheduler directly from this method call. This is the fastest approach when there are only
- // a few inputs and little changes at the same timestamp.
- // We always pick for option (1) unless there are no outputs available.
- if (activationOutput != null) {
- activationOutput.pull()
- return
- } else {
- runScheduler(now)
- }
- }
+ /**
+ * The demand for the multiplexer.
+ */
+ @JvmField var demand = 0.0
- /**
- * Synchronously run the scheduler of the multiplexer.
- */
- private fun runScheduler(now: Long): Long {
- val lastSchedulerCycle = _lastSchedulerCycle
- _lastSchedulerCycle = now
+ /**
+ * The capacity of the multiplexer.
+ */
+ @JvmField var capacity = 0.0
+
+ /**
+ * An [Output] that is used to activate the scheduler.
+ */
+ @JvmField var activationOutput: Output? = null
- val delta = max(0, now - lastSchedulerCycle)
+ /**
+ * The active inputs registered with the scheduler.
+ */
+ private val _activeInputs = mutableListOf<Input>()
- return try {
- _schedulerActive = true
- doSchedule(delta)
- } finally {
- _schedulerActive = false
+ /**
+ * The active outputs registered with the scheduler.
+ */
+ private val _activeOutputs = mutableListOf<Output>()
+
+ /**
+ * Flag to indicate that the scheduler is active.
+ */
+ private var _schedulerActive = false
+ private var _lastSchedulerCycle = Long.MAX_VALUE
+
+ /**
+ * The last convergence timestamp and the input.
+ */
+ private var _lastConverge: Long = Long.MIN_VALUE
+ private var _lastConvergeInput: Input? = null
+
+ /**
+ * Register the specified [input] to this scheduler.
+ */
+ fun registerInput(input: Input) {
+ _activeInputs.add(input)
+
+ val hasActivationOutput = activationOutput != null
+
+ // Disable timers and convergence of the source if one of the output manages it
+ input.shouldConsumerConverge = !hasActivationOutput
+ input.enableTimers = !hasActivationOutput
+ input.capacity = capacity
+ trigger(engine.clock.millis())
}
- }
- /**
- * Schedule the inputs over the outputs.
- *
- * @return The deadline after which a new scheduling cycle should start.
- */
- private fun doSchedule(delta: Long): Long {
- val activeInputs = _activeInputs
- val activeOutputs = _activeOutputs
-
- // Update the counters of the scheduler
- updateCounters(delta)
-
- // If there is no work yet, mark the inputs as idle.
- if (activeInputs.isEmpty()) {
- _demand = 0.0
- _rate = 0.0
- return Long.MAX_VALUE
+ /**
+ * De-register the specified [input] from this scheduler.
+ */
+ fun deregisterInput(input: Input, now: Long) {
+ // Assign a new input responsible for handling the convergence events
+ if (_lastConvergeInput == input) {
+ _lastConvergeInput = null
+ }
+
+ // Re-run scheduler to distribute new load
+ trigger(now)
}
- val capacity = _capacity
- var availableCapacity = capacity
+ /**
+ * This method is invoked when one of the inputs converges.
+ */
+ fun convergeInput(input: Input, now: Long) {
- // Pull in the work of the outputs
- val inputIterator = activeInputs.listIterator()
- for (input in inputIterator) {
- input.pullSync()
+ val lastConverge = _lastConverge
+ val lastConvergeInput = _lastConvergeInput
+ val parent = parent
+
+ if (parent != null && (now > lastConverge || lastConvergeInput == null || lastConvergeInput == input)) {
+ _lastConverge = now
+ _lastConvergeInput = input
- // Remove outputs that have finished
- if (!input.isActive) {
- input.actualRate = 0.0
- inputIterator.remove()
+ parent.onConverge(now, max(0, now - lastConverge))
}
}
- var demand = 0.0
- var deadline = Long.MAX_VALUE
+ /**
+ * Register the specified [output] to this scheduler.
+ */
+ fun registerOutput(output: Output) {
+ _activeOutputs.add(output)
- // Sort in-place the inputs based on their pushed flow.
- // Profiling shows that it is faster than maintaining some kind of sorted set.
- activeInputs.sort()
+ updateCapacity()
+ updateActivationOutput()
+ }
- // Divide the available output capacity fairly over the inputs using max-min fair sharing
- var remaining = activeInputs.size
- for (i in activeInputs.indices) {
- val input = activeInputs[i]
- val availableShare = availableCapacity / remaining--
- val grantedRate = min(input.allowedRate, availableShare)
+ /**
+ * De-register the specified [output] from this scheduler.
+ */
+ fun deregisterOutput(output: Output, now: Long) {
+ _activeOutputs.remove(output)
+ updateCapacity()
- demand += input.limit
- deadline = min(deadline, input.deadline)
- availableCapacity -= grantedRate
+ trigger(now)
+ }
+
+ /**
+ * This method is invoked when one of the outputs converges.
+ */
+ fun convergeOutput(output: Output, now: Long) {
+ val lastConverge = _lastConverge
+ val parent = parent
+
+ if (parent != null) {
+ _lastConverge = now
+
+ parent.onConverge(now, max(0, now - lastConverge))
+ }
- input.actualRate = grantedRate
+ if (!output.isActive) {
+ output.isActivationOutput = false
+ updateActivationOutput()
+ }
}
- val rate = capacity - availableCapacity
+ /**
+ * Trigger the scheduler of the multiplexer.
+ *
+ * @param now The current virtual timestamp of the simulation.
+ */
+ fun trigger(now: Long) {
+ if (_schedulerActive) {
+ // No need to trigger the scheduler in case it is already active
+ return
+ }
- _demand = demand
- _rate = rate
+ val activationOutput = activationOutput
- // Sort all consumers by their capacity
- activeOutputs.sort()
+ // We can run the scheduler in two ways:
+ // (1) We can pull one of the multiplexer's outputs. This allows us to cascade multiple pushes by the input
+ // into a single scheduling cycle, but is slower in case of a few changes at the same timestamp.
+ // (2) We run the scheduler directly from this method call. This is the fastest approach when there are only
+ // a few inputs and little changes at the same timestamp.
+ // We always pick for option (1) unless there are no outputs available.
+ if (activationOutput != null) {
+ activationOutput.pull()
+ return
+ } else {
+ runScheduler(now)
+ }
+ }
+
+ /**
+ * Synchronously run the scheduler of the multiplexer.
+ */
+ fun runScheduler(now: Long): Long {
+ val lastSchedulerCycle = _lastSchedulerCycle
+ _lastSchedulerCycle = now
- // Divide the requests over the available capacity of the input resources fairly
- for (i in activeOutputs.indices) {
- val output = activeOutputs[i]
- val inputCapacity = output.capacity
- val fraction = inputCapacity / capacity
- val grantedSpeed = rate * fraction
+ val delta = max(0, now - lastSchedulerCycle)
- output.push(grantedSpeed)
+ return try {
+ _schedulerActive = true
+ doRunScheduler(delta)
+ } finally {
+ _schedulerActive = false
+ }
}
- return deadline
- }
+ /**
+ * Recompute the capacity of the multiplexer.
+ */
+ fun updateCapacity() {
+ val newCapacity = _activeOutputs.sumOf(Output::capacity)
- /**
- * Recompute the capacity of the multiplexer.
- */
- private fun updateCapacity() {
- val newCapacity = _activeOutputs.sumOf(Output::capacity)
+ // No-op if the capacity is unchanged
+ if (capacity == newCapacity) {
+ return
+ }
- // No-op if the capacity is unchanged
- if (_capacity == newCapacity) {
- return
+ capacity = newCapacity
+
+ for (input in _activeInputs) {
+ input.capacity = newCapacity
+ }
}
- _capacity = newCapacity
+ /**
+ * Updates the output that is used for scheduler activation.
+ */
+ private fun updateActivationOutput() {
+ val output = _activeOutputs.firstOrNull()
+ activationOutput = output
- for (input in _inputs) {
- input.capacity = newCapacity
+ if (output != null) {
+ output.isActivationOutput = true
+ }
+
+ val hasActivationOutput = output != null
+
+ for (input in _activeInputs) {
+ input.shouldConsumerConverge = !hasActivationOutput
+ input.enableTimers = !hasActivationOutput
+ }
}
- }
- /**
- * The previous capacity of the multiplexer.
- */
- private var _previousCapacity = 0.0
+ /**
+ * Schedule the inputs over the outputs.
+ *
+ * @return The deadline after which a new scheduling cycle should start.
+ */
+ private fun doRunScheduler(delta: Long): Long {
+ val activeInputs = _activeInputs
+ val activeOutputs = _activeOutputs
+
+ // Update the counters of the scheduler
+ updateCounters(delta)
+
+ // If there is no work yet, mark the inputs as idle.
+ if (activeInputs.isEmpty()) {
+ demand = 0.0
+ rate = 0.0
+ return Long.MAX_VALUE
+ }
- /**
- * Update the counters of the scheduler.
- */
- private fun updateCounters(delta: Long) {
- val previousCapacity = _previousCapacity
- _previousCapacity = _capacity
+ val capacity = capacity
+ var availableCapacity = capacity
+
+ // Pull in the work of the outputs
+ val inputIterator = activeInputs.listIterator()
+ for (input in inputIterator) {
+ input.pullSync()
+
+ // Remove outputs that have finished
+ if (!input.isActive) {
+ input.actualRate = 0.0
+ inputIterator.remove()
+ }
+ }
- if (delta <= 0) {
- return
+ var demand = 0.0
+ var deadline = Long.MAX_VALUE
+
+ // Sort in-place the inputs based on their pushed flow.
+ // Profiling shows that it is faster than maintaining some kind of sorted set.
+ activeInputs.sort()
+
+ // Divide the available output capacity fairly over the inputs using max-min fair sharing
+ val size = activeInputs.size
+ for (i in activeInputs.indices) {
+ val input = activeInputs[i]
+ val availableShare = availableCapacity / (size - i)
+ val grantedRate = min(input.allowedRate, availableShare)
+
+ demand += input.limit
+ deadline = min(deadline, input.deadline)
+ availableCapacity -= grantedRate
+
+ input.actualRate = grantedRate
+ }
+
+ val rate = capacity - availableCapacity
+
+ this.demand = demand
+ this.rate = rate
+
+ // Sort all consumers by their capacity
+ activeOutputs.sort()
+
+ // Divide the requests over the available capacity of the input resources fairly
+ for (i in activeOutputs.indices) {
+ val output = activeOutputs[i]
+ val inputCapacity = output.capacity
+ val fraction = inputCapacity / capacity
+ val grantedSpeed = rate * fraction
+
+ output.push(grantedSpeed)
+ }
+
+ return deadline
}
- val deltaS = delta / 1000.0
+ /**
+ * The previous capacity of the multiplexer.
+ */
+ private var _previousCapacity = 0.0
- _counters.demand += _demand * deltaS
- _counters.actual += _rate * deltaS
- _counters.remaining += (previousCapacity - _rate) * deltaS
- }
+ /**
+ * Update the counters of the scheduler.
+ */
+ private fun updateCounters(delta: Long) {
+ val previousCapacity = _previousCapacity
+ _previousCapacity = capacity
- /**
- * Updates the output that is used for scheduler activation.
- */
- private fun updateActivationOutput() {
- val output = _activeOutputs.firstOrNull()
- _activationOutput = output
+ if (delta <= 0) {
+ return
+ }
+
+ val deltaS = delta / 1000.0
- for (input in _activeInputs) {
- input.enableTimers = output == null
+ counters.demand += demand * deltaS
+ counters.actual += rate * deltaS
+ counters.remaining += (previousCapacity - rate) * deltaS
}
}
/**
* An internal [FlowConsumer] implementation for multiplexer inputs.
*/
- private inner class Input(capacity: Double, val key: InterferenceKey?) :
- AbstractFlowConsumer(engine, capacity),
- FlowConsumerLogic,
- Comparable<Input> {
+ private class Input(
+ engine: FlowEngine,
+ private val scheduler: Scheduler,
+ private val interferenceDomain: InterferenceDomain?,
+ @JvmField val key: InterferenceKey?
+ ) : AbstractFlowConsumer(engine, scheduler.capacity), FlowConsumerLogic, Comparable<Input> {
/**
* The requested limit.
*/
@@ -341,18 +468,18 @@ public class MaxMinFlowMultiplexer(
@JvmField var actualRate: Double = 0.0
/**
- * The processing rate that is allowed by the model constraints.
- */
- val allowedRate: Double
- get() = min(capacity, limit)
-
- /**
* The deadline of the input.
*/
val deadline: Long
get() = ctx?.deadline ?: Long.MAX_VALUE
/**
+ * The processing rate that is allowed by the model constraints.
+ */
+ val allowedRate: Double
+ get() = min(capacity, limit)
+
+ /**
* A flag to enable timers for the input.
*/
var enableTimers: Boolean = true
@@ -362,14 +489,18 @@ public class MaxMinFlowMultiplexer(
}
/**
- * A flag to indicate that the input is closed.
+ * A flag to control whether the input should converge.
*/
- private var _isClosed: Boolean = false
+ var shouldConsumerConverge: Boolean = true
+ set(value) {
+ field = value
+ ctx?.shouldConsumerConverge = value
+ }
/**
- * The interference domain this input belongs to.
+ * A flag to indicate that the input is closed.
*/
- private val interferenceDomain = this@MaxMinFlowMultiplexer.interferenceDomain
+ private var _isClosed: Boolean = false
/**
* Close the input.
@@ -386,17 +517,8 @@ public class MaxMinFlowMultiplexer(
override fun start(ctx: FlowConsumerContext) {
check(!_isClosed) { "Cannot re-use closed input" }
-
- _activeInputs += this
-
- if (parent != null) {
- ctx.shouldConsumerConverge = true
- }
- enableTimers = _activationOutput == null // Disable timers of the source if one of the output manages it
-
+ scheduler.registerInput(this)
super.start(ctx)
-
- triggerScheduler(engine.clock.millis())
}
/* FlowConsumerLogic */
@@ -411,19 +533,7 @@ public class MaxMinFlowMultiplexer(
actualRate = 0.0
limit = rate
- triggerScheduler(now)
- }
-
- override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
- val lastConverge = _lastConverge
- val parent = parent
-
- if (parent != null && (lastConverge < now || _lastConvergeInput == null)) {
- _lastConverge = now
- _lastConvergeInput = this
-
- parent.onConverge(now, max(0, now - lastConverge))
- }
+ scheduler.trigger(now)
}
override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) {
@@ -432,18 +542,16 @@ public class MaxMinFlowMultiplexer(
limit = 0.0
actualRate = 0.0
- // Assign a new input responsible for handling the convergence events
- if (_lastConvergeInput == this) {
- _lastConvergeInput = null
- }
-
- // Re-run scheduler to distribute new load
- triggerScheduler(now)
+ scheduler.deregisterInput(this, now)
// BUG: Cancel the connection so that `ctx` is set to `null`
cancel()
}
+ override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
+ scheduler.convergeInput(this, now)
+ }
+
/* Comparable */
override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate)
@@ -464,7 +572,7 @@ public class MaxMinFlowMultiplexer(
// Compute the performance penalty due to flow interference
val perfScore = if (interferenceDomain != null) {
- val load = _rate / _capacity
+ val load = scheduler.rate / scheduler.capacity
interferenceDomain.apply(key, load)
} else {
1.0
@@ -477,14 +585,14 @@ public class MaxMinFlowMultiplexer(
updateCounters(demand, actual, remaining)
- _counters.interference += actual * max(0.0, 1 - perfScore)
+ scheduler.counters.interference += actual * max(0.0, 1 - perfScore)
}
}
/**
* An internal [FlowSource] implementation for multiplexer outputs.
*/
- private inner class Output : FlowSource, Comparable<Output> {
+ private class Output(private val scheduler: Scheduler) : FlowSource, Comparable<Output> {
/**
* The active [FlowConnection] of this source.
*/
@@ -496,6 +604,22 @@ public class MaxMinFlowMultiplexer(
@JvmField var capacity: Double = 0.0
/**
+ * A flag to indicate that this output is the activation output.
+ */
+ var isActivationOutput: Boolean
+ get() = _isActivationOutput
+ set(value) {
+ _isActivationOutput = value
+ _conn?.shouldSourceConverge = value
+ }
+ private var _isActivationOutput: Boolean = false
+
+ /**
+ * A flag to indicate that the output is active.
+ */
+ @JvmField var isActive = false
+
+ /**
* Push the specified rate to the consumer.
*/
fun push(rate: Double) {
@@ -520,33 +644,29 @@ public class MaxMinFlowMultiplexer(
assert(_conn == null) { "Source running concurrently" }
_conn = conn
capacity = conn.capacity
- _activeOutputs.add(this)
+ isActive = true
- updateCapacity()
- updateActivationOutput()
+ scheduler.registerOutput(this)
}
override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
_conn = null
capacity = 0.0
- _activeOutputs.remove(this)
+ isActive = false
- updateCapacity()
- updateActivationOutput()
-
- triggerScheduler(now)
+ scheduler.deregisterOutput(this, now)
}
override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
val capacity = capacity
if (capacity != conn.capacity) {
this.capacity = capacity
- updateCapacity()
+ scheduler.updateCapacity()
}
- return if (_activationOutput == this) {
+ return if (_isActivationOutput) {
// If this output is the activation output, synchronously run the scheduler and return the new deadline
- val deadline = runScheduler(now)
+ val deadline = scheduler.runScheduler(now)
if (deadline == Long.MAX_VALUE)
deadline
else
@@ -554,11 +674,17 @@ public class MaxMinFlowMultiplexer(
} else {
// Output is not the activation output, so trigger activation output and do not install timer for this
// output (by returning `Long.MAX_VALUE`)
- triggerScheduler(now)
+ scheduler.trigger(now)
Long.MAX_VALUE
}
}
+ override fun onConverge(conn: FlowConnection, now: Long, delta: Long) {
+ if (_isActivationOutput) {
+ scheduler.convergeOutput(this, now)
+ }
+ }
+
override fun compareTo(other: Output): Int = capacity.compareTo(other.capacity)
}
}