diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-05 21:44:55 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-10-05 21:44:55 +0200 |
| commit | 3098eeb116a80ce12e6575e454d0448867478792 (patch) | |
| tree | c979d9dd492b9c811eabb6f1797428dcb9012bff /opendc-simulator/opendc-simulator-flow/src | |
| parent | b92d0e8703014f143ff0b1fe67de09fff6f867b1 (diff) | |
| parent | a6eec366f2a171f112a94d4ed50fe2c6521792a5 (diff) | |
merge: Add benchmarks for Capelin experiments
This pull request adds a JMH benchmark for the Capelin experiments, in order to concretely measure
the bottlenecks in these experiments.
* Do not prune invocations on sync engine invocation
* Add benchmark for Capelin experiment
* Extract scheduler for max min multiplexer
* Only sort outputs on capacity change
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src')
6 files changed, 544 insertions, 321 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt index e927f81d..aabd2220 100644 --- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt @@ -24,7 +24,6 @@ package org.opendc.simulator.flow import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.launch -import org.opendc.simulator.core.SimulationCoroutineScope import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer @@ -39,61 +38,53 @@ import java.util.concurrent.TimeUnit @Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) @OptIn(ExperimentalCoroutinesApi::class) class FlowBenchmarks { - private lateinit var scope: SimulationCoroutineScope - private lateinit var engine: FlowEngine + private lateinit var trace: Sequence<TraceFlowSource.Fragment> @Setup fun setUp() { - scope = SimulationCoroutineScope() - engine = FlowEngine(scope.coroutineContext, scope.clock) - } - - @State(Scope.Thread) - class Workload { - lateinit var trace: Sequence<TraceFlowSource.Fragment> - - @Setup - fun setUp() { - val random = ThreadLocalRandom.current() - val entries = List(10000) { TraceFlowSource.Fragment(1000, random.nextDouble(0.0, 4500.0)) } - trace = entries.asSequence() - } + val random = ThreadLocalRandom.current() + val entries = List(10000) { TraceFlowSource.Fragment(1000, random.nextDouble(0.0, 4500.0)) } + trace = entries.asSequence() } @Benchmark - fun benchmarkSink(state: Workload) { - return scope.runBlockingSimulation { + fun benchmarkSink() { + return runBlockingSimulation { + val engine = FlowEngine(coroutineContext, clock) val provider = FlowSink(engine, 4200.0) - return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) + return@runBlockingSimulation provider.consume(TraceFlowSource(trace)) } } @Benchmark - fun benchmarkForward(state: Workload) { - return scope.runBlockingSimulation { + fun benchmarkForward() { + return runBlockingSimulation { + val engine = FlowEngine(coroutineContext, clock) val provider = FlowSink(engine, 4200.0) val forwarder = FlowForwarder(engine) provider.startConsumer(forwarder) - return@runBlockingSimulation forwarder.consume(TraceFlowSource(state.trace)) + return@runBlockingSimulation forwarder.consume(TraceFlowSource(trace)) } } @Benchmark - fun benchmarkMuxMaxMinSingleSource(state: Workload) { - return scope.runBlockingSimulation { + fun benchmarkMuxMaxMinSingleSource() { + return runBlockingSimulation { + val engine = FlowEngine(coroutineContext, clock) val switch = MaxMinFlowMultiplexer(engine) FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) val provider = switch.newInput() - return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) + return@runBlockingSimulation provider.consume(TraceFlowSource(trace)) } } @Benchmark - fun benchmarkMuxMaxMinTripleSource(state: Workload) { - return scope.runBlockingSimulation { + fun benchmarkMuxMaxMinTripleSource() { + return runBlockingSimulation { + val engine = FlowEngine(coroutineContext, clock) val switch = MaxMinFlowMultiplexer(engine) FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) @@ -102,28 +93,30 @@ class FlowBenchmarks { repeat(3) { launch { val provider = switch.newInput() - provider.consume(TraceFlowSource(state.trace)) + provider.consume(TraceFlowSource(trace)) } } } } @Benchmark - fun benchmarkMuxExclusiveSingleSource(state: Workload) { - return scope.runBlockingSimulation { + fun benchmarkMuxExclusiveSingleSource() { + return runBlockingSimulation { + val engine = FlowEngine(coroutineContext, clock) val switch = ForwardingFlowMultiplexer(engine) FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) val provider = switch.newInput() - return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) + return@runBlockingSimulation provider.consume(TraceFlowSource(trace)) } } @Benchmark - fun benchmarkMuxExclusiveTripleSource(state: Workload) { - return scope.runBlockingSimulation { + fun benchmarkMuxExclusiveTripleSource() { + return runBlockingSimulation { + val engine = FlowEngine(coroutineContext, clock) val switch = ForwardingFlowMultiplexer(engine) FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) @@ -132,7 +125,7 @@ class FlowBenchmarks { repeat(2) { launch { val provider = switch.newInput() - provider.consume(TraceFlowSource(state.trace)) + provider.consume(TraceFlowSource(trace)) } } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt index 15f9b93b..d7182497 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt @@ -29,6 +29,11 @@ package org.opendc.simulator.flow */ public interface FlowConsumerContext : FlowConnection { /** + * The deadline of the source. + */ + public val deadline: Long + + /** * The capacity of the connection. */ public override var capacity: Double @@ -39,12 +44,17 @@ public interface FlowConsumerContext : FlowConnection { public var shouldConsumerConverge: Boolean /** + * A flag to control whether the timers for the [FlowSource] should be enabled. + */ + public var enableTimers: Boolean + + /** * Start the flow over the connection. */ public fun start() /** - * Synchronously flush the changes of the connection. + * Synchronously pull the source of the connection. */ - public fun flush() + public fun pullSync() } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt index 81ed9f26..97d56fff 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt @@ -35,9 +35,10 @@ internal const val ConnState = 0b11 // Mask for accessing the state of the flow */ internal const val ConnPulled = 1 shl 2 // The source should be pulled internal const val ConnPushed = 1 shl 3 // The source has pushed a value -internal const val ConnUpdateActive = 1 shl 4 // An update for the connection is active -internal const val ConnUpdatePending = 1 shl 5 // An (immediate) update of the connection is pending -internal const val ConnUpdateSkipped = 1 shl 6 // An update of the connection was not necessary +internal const val ConnClose = 1 shl 4 // The connection should be closed +internal const val ConnUpdateActive = 1 shl 5 // An update for the connection is active +internal const val ConnUpdatePending = 1 shl 6 // An (immediate) update of the connection is pending internal const val ConnConvergePending = 1 shl 7 // Indication that a convergence is already pending internal const val ConnConvergeSource = 1 shl 8 // Enable convergence of the source internal const val ConnConvergeConsumer = 1 shl 9 // Enable convergence of the consumer +internal const val ConnDisableTimers = 1 shl 10 // Disable timers for the source diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index 9d36483e..9a568897 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -69,23 +69,30 @@ internal class FlowConsumerContextImpl( */ override val demand: Double get() = _demand + private var _demand: Double = 0.0 // The current (pending) demand of the source + + /** + * The deadline of the source. + */ + override val deadline: Long + get() = _deadline + private var _deadline: Long = Long.MAX_VALUE // The deadline of the source's timer /** * Flags to control the convergence of the consumer and source. */ - override var shouldSourceConverge: Boolean = false + override var shouldSourceConverge: Boolean + get() = _flags and ConnConvergeSource == ConnConvergeSource set(value) { - field = value _flags = if (value) _flags or ConnConvergeSource else _flags and ConnConvergeSource.inv() } - override var shouldConsumerConverge: Boolean = false + override var shouldConsumerConverge: Boolean + get() = _flags and ConnConvergeConsumer == ConnConvergeConsumer set(value) { - field = value - _flags = if (value) _flags or ConnConvergeConsumer @@ -94,15 +101,22 @@ internal class FlowConsumerContextImpl( } /** - * The clock to track simulation time. + * Flag to control the timers on the [FlowSource] */ - private val _clock = engine.clock + override var enableTimers: Boolean + get() = _flags and ConnDisableTimers != ConnDisableTimers + set(value) { + _flags = + if (!value) + _flags or ConnDisableTimers + else + _flags and ConnDisableTimers.inv() + } /** - * The current state of the connection. + * The clock to track simulation time. */ - private var _demand: Double = 0.0 // The current (pending) demand of the source - private var _deadline: Long = Long.MAX_VALUE // The deadline of the source's timer + private val _clock = engine.clock /** * The flags of the flow connection, indicating certain actions. @@ -136,23 +150,17 @@ internal class FlowConsumerContextImpl( } override fun close() { - var flags = _flags + val flags = _flags if (flags and ConnState == ConnClosed) { return } - engine.batch { - // Mark the connection as closed and pulled (in order to converge) - flags = (flags and ConnState.inv()) or ConnClosed or ConnPulled - _flags = flags - - if (flags and ConnUpdateActive == 0) { - val now = _clock.millis() - doStopSource(now) - - // FIX: Make sure the context converges - scheduleImmediate(now, flags) - } + // Toggle the close bit. In case no update is active, schedule a new update. + if (flags and ConnUpdateActive == 0) { + val now = _clock.millis() + scheduleImmediate(now, flags or ConnClose) + } else { + _flags = flags or ConnClose } } @@ -166,7 +174,7 @@ internal class FlowConsumerContextImpl( scheduleImmediate(_clock.millis(), flags or ConnPulled) } - override fun flush() { + override fun pullSync() { val flags = _flags // Do not attempt to flush the connection if the connection is closed or an update is already active @@ -174,7 +182,11 @@ internal class FlowConsumerContextImpl( return } - engine.scheduleSync(_clock.millis(), this) + val now = _clock.millis() + + if (flags and (ConnPulled or ConnPushed) != 0 || _deadline == now) { + engine.scheduleSync(now, this) + } } override fun push(rate: Double) { @@ -218,7 +230,7 @@ internal class FlowConsumerContextImpl( val deadline = _deadline val reachedDeadline = deadline == now - var newDeadline = deadline + var newDeadline: Long var hasUpdated = false try { @@ -245,9 +257,13 @@ internal class FlowConsumerContextImpl( deadline } + // Make the new deadline available for the consumer if it has changed + if (newDeadline != deadline) { + _deadline = newDeadline + } + // Push to the consumer if the rate of the source has changed (after a call to `push`) - val newState = flags and ConnState - if (newState == ConnActive && flags and ConnPushed != 0) { + if (flags and ConnPushed != 0) { val lastPush = _lastPush val delta = max(0, now - lastPush) @@ -260,17 +276,33 @@ internal class FlowConsumerContextImpl( // IMPORTANT: Re-fetch the flags after the callback might have changed those flags = _flags - } else if (newState == ConnClosed) { + } + + // Check whether the source or consumer have tried to close the connection + if (flags and ConnClose != 0) { hasUpdated = true // The source has called [FlowConnection.close], so clean up the connection doStopSource(now) + + // IMPORTANT: Re-fetch the flags after the callback might have changed those + // We now also mark the connection as closed + flags = (_flags and ConnState.inv()) or ConnClosed + + _demand = 0.0 + newDeadline = Long.MAX_VALUE } } catch (cause: Throwable) { + hasUpdated = true + + // Clean up the connection + doFailSource(now, cause) + // Mark the connection as closed flags = (flags and ConnState.inv()) or ConnClosed - doFailSource(now, cause) + _demand = 0.0 + newDeadline = Long.MAX_VALUE } // Check whether the connection needs to be added to the visited queue. This is the case when: @@ -302,14 +334,16 @@ internal class FlowConsumerContextImpl( _timer } - // Set the new deadline and schedule a delayed update for that deadline - _deadline = newDeadline - // Check whether we need to schedule a new timer for this connection. That is the case when: // (1) The deadline is valid (not the maximum value) // (2) The connection is active - // (3) The current active timer for the connection points to a later deadline - if (newDeadline == Long.MAX_VALUE || flags and ConnState != ConnActive || (timer != null && newDeadline >= timer.target)) { + // (3) Timers are not disabled for the source + // (4) The current active timer for the connection points to a later deadline + if (newDeadline == Long.MAX_VALUE || + flags and ConnState != ConnActive || + flags and ConnDisableTimers != 0 || + (timer != null && newDeadline >= timer.target) + ) { // Ignore any deadline scheduled at the maximum value // This indicates that the source does not want to register a timer return @@ -336,8 +370,8 @@ internal class FlowConsumerContextImpl( // The connection is converging now, so unset the convergence pending flag _flags = flags and ConnConvergePending.inv() - // Call the source converge callback if it has enabled convergence and the connection is active - if (flags and ConnState == ConnActive && flags and ConnConvergeSource != 0) { + // Call the source converge callback if it has enabled convergence + if (flags and ConnConvergeSource != 0) { val delta = max(0, now - _lastSourceConvergence) _lastSourceConvergence = now @@ -352,7 +386,13 @@ internal class FlowConsumerContextImpl( logic.onConverge(this, now, delta) } } catch (cause: Throwable) { + // Invoke the finish callbacks doFailSource(now, cause) + + // Mark the connection as closed + _flags = (_flags and ConnState.inv()) or ConnClosed + _demand = 0.0 + _deadline = Long.MAX_VALUE } } @@ -367,10 +407,6 @@ internal class FlowConsumerContextImpl( doFinishConsumer(now, null) } catch (cause: Throwable) { doFinishConsumer(now, cause) - return - } finally { - _deadline = Long.MAX_VALUE - _demand = 0.0 } } @@ -383,9 +419,6 @@ internal class FlowConsumerContextImpl( } catch (e: Throwable) { e.addSuppressed(cause) doFinishConsumer(now, e) - } finally { - _deadline = Long.MAX_VALUE - _demand = 0.0 } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt index 019b5f10..a9234abf 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt @@ -38,7 +38,7 @@ import kotlin.coroutines.CoroutineContext * @param context The coroutine context to use. * @param clock The virtual simulation clock. */ -internal class FlowEngineImpl(private val context: CoroutineContext, override val clock: Clock) : FlowEngine { +internal class FlowEngineImpl(private val context: CoroutineContext, override val clock: Clock) : FlowEngine, Runnable { /** * The [Delay] instance that provides scheduled execution of [Runnable]s. */ @@ -82,7 +82,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va return } - runEngine(now) + doRunEngine(now) } /** @@ -100,7 +100,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va return } - runEngine(now) + doRunEngine(now) } override fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext = FlowConsumerContextImpl(this, consumer, provider) @@ -120,16 +120,13 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va } } - /** - * Run the engine and mark as active while running. - */ - private fun runEngine(now: Long) { - try { - batchIndex++ - doRunEngine(now) - } finally { - batchIndex-- - } + /* Runnable */ + override fun run() { + val now = clock.millis() + val invocation = futureInvocations.poll() // Clear invocation from future invocation queue + assert(now >= invocation.timestamp) { "Future invocations invariant violated" } + + doRunEngine(now) } /** @@ -141,44 +138,43 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va val futureInvocations = futureInvocations val visited = visited - // Remove any entries in the `futureInvocations` queue from the past - while (true) { - val head = futureInvocations.peek() - if (head == null || head.timestamp > now) { - break - } - futureInvocations.poll() - } - - // Execute all scheduled updates at current timestamp - while (true) { - val timer = futureQueue.peek() ?: break - val target = timer.target + try { + // Increment batch index so synchronous calls will not launch concurrent engine invocations + batchIndex++ - if (target > now) { - break - } + // Execute all scheduled updates at current timestamp + while (true) { + val timer = futureQueue.peek() ?: break + val target = timer.target - assert(target >= now) { "Internal inconsistency: found update of the past" } + if (target > now) { + break + } - futureQueue.poll() - timer.ctx.doUpdate(now, visited, futureQueue, isImmediate = false) - } + assert(target >= now) { "Internal inconsistency: found update of the past" } - // Repeat execution of all immediate updates until the system has converged to a steady-state - // We have to take into account that the onConverge callback can also trigger new actions. - do { - // Execute all immediate updates - while (true) { - val ctx = queue.poll() ?: break - ctx.doUpdate(now, visited, futureQueue, isImmediate = true) + futureQueue.poll() + timer.ctx.doUpdate(now, visited, futureQueue, isImmediate = false) } - while (true) { - val ctx = visited.poll() ?: break - ctx.onConverge(now) - } - } while (queue.isNotEmpty()) + // Repeat execution of all immediate updates until the system has converged to a steady-state + // We have to take into account that the onConverge callback can also trigger new actions. + do { + // Execute all immediate updates + while (true) { + val ctx = queue.poll() ?: break + ctx.doUpdate(now, visited, futureQueue, isImmediate = true) + } + + while (true) { + val ctx = visited.poll() ?: break + ctx.onConverge(now) + } + } while (queue.isNotEmpty()) + } finally { + // Decrement batch index to indicate no engine is active at the moment + batchIndex-- + } // Schedule an engine invocation for the next update to occur. val headTimer = futureQueue.peek() @@ -195,24 +191,14 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va * @param scheduled The queue of scheduled invocations. */ private fun trySchedule(now: Long, scheduled: ArrayDeque<Invocation>, target: Long) { - while (true) { - val invocation = scheduled.peekFirst() - if (invocation == null || invocation.timestamp > target) { - // Case 2: A new timer was registered ahead of the other timers. - // Solution: Schedule a new scheduler invocation - @OptIn(InternalCoroutinesApi::class) - val handle = delay.invokeOnTimeout(target - now, { runEngine(target) }, context) - scheduled.addFirst(Invocation(target, handle)) - break - } else if (invocation.timestamp < target) { - // Case 2: A timer was cancelled and the head of the timer queue is now later than excepted - // Solution: Cancel the next scheduler invocation - scheduled.pollFirst() - - invocation.cancel() - } else { - break - } + val head = scheduled.peek() + + // Only schedule a new scheduler invocation in case the target is earlier than all other pending + // scheduler invocations + if (head == null || target < head.timestamp) { + @OptIn(InternalCoroutinesApi::class) + val handle = delay.invokeOnTimeout(target - now, this, context) + scheduled.addFirst(Invocation(target, handle)) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 5ff0fb8d..97059e93 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -38,7 +38,7 @@ import kotlin.math.min */ public class MaxMinFlowMultiplexer( private val engine: FlowEngine, - private val parent: FlowConvergenceListener? = null, + parent: FlowConvergenceListener? = null, private val interferenceDomain: InterferenceDomain? = null ) : FlowMultiplexer { /** @@ -47,7 +47,6 @@ public class MaxMinFlowMultiplexer( override val inputs: Set<FlowConsumer> get() = _inputs private val _inputs = mutableSetOf<Input>() - private val _activeInputs = mutableListOf<Input>() /** * The outputs of the multiplexer. @@ -55,50 +54,38 @@ public class MaxMinFlowMultiplexer( override val outputs: Set<FlowSource> get() = _outputs private val _outputs = mutableSetOf<Output>() - private val _activeOutputs = mutableListOf<Output>() /** * The flow counters of this multiplexer. */ public override val counters: FlowCounters - get() = _counters - private val _counters = FlowCountersImpl() + get() = scheduler.counters /** * The actual processing rate of the multiplexer. */ public override val rate: Double - get() = _rate - private var _rate = 0.0 + get() = scheduler.rate /** * The demanded processing rate of the input. */ public override val demand: Double - get() = _demand - private var _demand = 0.0 + get() = scheduler.demand /** * The capacity of the outputs. */ public override val capacity: Double - get() = _capacity - private var _capacity = 0.0 + get() = scheduler.capacity /** - * Flag to indicate that the scheduler is active. + * The [Scheduler] instance of this multiplexer. */ - private var _schedulerActive = false - private var _lastSchedulerCycle = Long.MAX_VALUE - - /** - * The last convergence timestamp and the input. - */ - private var _lastConverge: Long = Long.MIN_VALUE - private var _lastConvergeInput: Input? = null + private val scheduler = Scheduler(engine, parent) override fun newInput(key: InterferenceKey?): FlowConsumer { - val provider = Input(_capacity, key) + val provider = Input(engine, scheduler, interferenceDomain, key) _inputs.add(provider) return provider } @@ -112,7 +99,7 @@ public class MaxMinFlowMultiplexer( } override fun newOutput(): FlowSource { - val output = Output() + val output = Output(scheduler) _outputs.add(output) return output } @@ -146,147 +133,330 @@ public class MaxMinFlowMultiplexer( } /** - * Converge the scheduler of the multiplexer. + * Helper class containing the scheduler state. */ - private fun runScheduler(now: Long) { - if (_schedulerActive) { - return - } - val lastSchedulerCycle = _lastSchedulerCycle - val delta = max(0, now - lastSchedulerCycle) - _schedulerActive = true - _lastSchedulerCycle = now - - try { - doSchedule(now, delta) - } finally { - _schedulerActive = false - } - } + private class Scheduler(private val engine: FlowEngine, private val parent: FlowConvergenceListener?) { + /** + * The flow counters of this scheduler. + */ + @JvmField val counters = FlowCountersImpl() - /** - * Schedule the inputs over the outputs. - */ - private fun doSchedule(now: Long, delta: Long) { - val activeInputs = _activeInputs - val activeOutputs = _activeOutputs + /** + * The flow rate of the multiplexer. + */ + @JvmField var rate = 0.0 - // Update the counters of the scheduler - updateCounters(delta) + /** + * The demand for the multiplexer. + */ + @JvmField var demand = 0.0 - // If there is no work yet, mark the inputs as idle. - if (activeInputs.isEmpty()) { - _demand = 0.0 - _rate = 0.0 - return + /** + * The capacity of the multiplexer. + */ + @JvmField var capacity = 0.0 + + /** + * An [Output] that is used to activate the scheduler. + */ + @JvmField var activationOutput: Output? = null + + /** + * The active inputs registered with the scheduler. + */ + private val _activeInputs = mutableListOf<Input>() + + /** + * The active outputs registered with the scheduler. + */ + private val _activeOutputs = mutableListOf<Output>() + + /** + * Flag to indicate that the scheduler is active. + */ + private var _schedulerActive = false + private var _lastSchedulerCycle = Long.MAX_VALUE + + /** + * The last convergence timestamp and the input. + */ + private var _lastConverge: Long = Long.MIN_VALUE + private var _lastConvergeInput: Input? = null + + /** + * Register the specified [input] to this scheduler. + */ + fun registerInput(input: Input) { + _activeInputs.add(input) + + val hasActivationOutput = activationOutput != null + + // Disable timers and convergence of the source if one of the output manages it + input.shouldConsumerConverge = !hasActivationOutput + input.enableTimers = !hasActivationOutput + input.capacity = capacity + trigger(engine.clock.millis()) + } + + /** + * De-register the specified [input] from this scheduler. + */ + fun deregisterInput(input: Input, now: Long) { + // Assign a new input responsible for handling the convergence events + if (_lastConvergeInput == input) { + _lastConvergeInput = null + } + + // Re-run scheduler to distribute new load + trigger(now) } - val capacity = _capacity - var availableCapacity = capacity + /** + * This method is invoked when one of the inputs converges. + */ + fun convergeInput(input: Input, now: Long) { + + val lastConverge = _lastConverge + val lastConvergeInput = _lastConvergeInput + val parent = parent - // Pull in the work of the outputs - val inputIterator = activeInputs.listIterator() - for (input in inputIterator) { - input.pull(now) + if (parent != null && (now > lastConverge || lastConvergeInput == null || lastConvergeInput == input)) { + _lastConverge = now + _lastConvergeInput = input - // Remove outputs that have finished - if (!input.isActive) { - input.actualRate = 0.0 - inputIterator.remove() + parent.onConverge(now, max(0, now - lastConverge)) } } - var demand = 0.0 + /** + * Register the specified [output] to this scheduler. + */ + fun registerOutput(output: Output) { + _activeOutputs.add(output) + + updateCapacity() + updateActivationOutput() + } + + /** + * De-register the specified [output] from this scheduler. + */ + fun deregisterOutput(output: Output, now: Long) { + _activeOutputs.remove(output) + updateCapacity() - // Sort in-place the inputs based on their pushed flow. - // Profiling shows that it is faster than maintaining some kind of sorted set. - activeInputs.sort() + trigger(now) + } + + /** + * This method is invoked when one of the outputs converges. + */ + fun convergeOutput(output: Output, now: Long) { + val lastConverge = _lastConverge + val parent = parent - // Divide the available output capacity fairly over the inputs using max-min fair sharing - var remaining = activeInputs.size - for (i in activeInputs.indices) { - val input = activeInputs[i] - val availableShare = availableCapacity / remaining-- - val grantedRate = min(input.allowedRate, availableShare) + if (parent != null) { + _lastConverge = now - // Ignore empty sources - if (grantedRate <= 0.0) { - input.actualRate = 0.0 - continue + parent.onConverge(now, max(0, now - lastConverge)) } - input.actualRate = grantedRate - demand += input.limit - availableCapacity -= grantedRate + if (!output.isActive) { + output.isActivationOutput = false + updateActivationOutput() + } } - val rate = capacity - availableCapacity + /** + * Trigger the scheduler of the multiplexer. + * + * @param now The current virtual timestamp of the simulation. + */ + fun trigger(now: Long) { + if (_schedulerActive) { + // No need to trigger the scheduler in case it is already active + return + } + + val activationOutput = activationOutput - _demand = demand - _rate = rate + // We can run the scheduler in two ways: + // (1) We can pull one of the multiplexer's outputs. This allows us to cascade multiple pushes by the input + // into a single scheduling cycle, but is slower in case of a few changes at the same timestamp. + // (2) We run the scheduler directly from this method call. This is the fastest approach when there are only + // a few inputs and little changes at the same timestamp. + // We always pick for option (1) unless there are no outputs available. + if (activationOutput != null) { + activationOutput.pull() + return + } else { + runScheduler(now) + } + } - // Sort all consumers by their capacity - activeOutputs.sort() + /** + * Synchronously run the scheduler of the multiplexer. + */ + fun runScheduler(now: Long): Long { + val lastSchedulerCycle = _lastSchedulerCycle + _lastSchedulerCycle = now - // Divide the requests over the available capacity of the input resources fairly - for (i in activeOutputs.indices) { - val output = activeOutputs[i] - val inputCapacity = output.capacity - val fraction = inputCapacity / capacity - val grantedSpeed = rate * fraction + val delta = max(0, now - lastSchedulerCycle) - output.push(grantedSpeed) + return try { + _schedulerActive = true + doRunScheduler(delta) + } finally { + _schedulerActive = false + } } - } - /** - * Recompute the capacity of the multiplexer. - */ - private fun updateCapacity() { - val newCapacity = _activeOutputs.sumOf(Output::capacity) + /** + * Recompute the capacity of the multiplexer. + */ + fun updateCapacity() { + val newCapacity = _activeOutputs.sumOf(Output::capacity) - // No-op if the capacity is unchanged - if (_capacity == newCapacity) { - return + // No-op if the capacity is unchanged + if (capacity == newCapacity) { + return + } + + capacity = newCapacity + + for (input in _activeInputs) { + input.capacity = newCapacity + } + + // Sort outputs by their capacity + _activeOutputs.sort() } - _capacity = newCapacity + /** + * Updates the output that is used for scheduler activation. + */ + private fun updateActivationOutput() { + val output = _activeOutputs.firstOrNull() + activationOutput = output - for (input in _inputs) { - input.capacity = newCapacity + if (output != null) { + output.isActivationOutput = true + } + + val hasActivationOutput = output != null + + for (input in _activeInputs) { + input.shouldConsumerConverge = !hasActivationOutput + input.enableTimers = !hasActivationOutput + } } - } - /** - * The previous capacity of the multiplexer. - */ - private var _previousCapacity = 0.0 + /** + * Schedule the inputs over the outputs. + * + * @return The deadline after which a new scheduling cycle should start. + */ + private fun doRunScheduler(delta: Long): Long { + val activeInputs = _activeInputs + val activeOutputs = _activeOutputs + + // Update the counters of the scheduler + updateCounters(delta) + + // If there is no work yet, mark the inputs as idle. + if (activeInputs.isEmpty()) { + demand = 0.0 + rate = 0.0 + return Long.MAX_VALUE + } - /** - * Update the counters of the scheduler. - */ - private fun updateCounters(delta: Long) { - val previousCapacity = _previousCapacity - _previousCapacity = _capacity + val capacity = capacity + var availableCapacity = capacity + + // Pull in the work of the outputs + val inputIterator = activeInputs.listIterator() + for (input in inputIterator) { + input.pullSync() + + // Remove outputs that have finished + if (!input.isActive) { + input.actualRate = 0.0 + inputIterator.remove() + } + } - if (delta <= 0) { - return + var demand = 0.0 + var deadline = Long.MAX_VALUE + + // Sort in-place the inputs based on their pushed flow. + // Profiling shows that it is faster than maintaining some kind of sorted set. + activeInputs.sort() + + // Divide the available output capacity fairly over the inputs using max-min fair sharing + val size = activeInputs.size + for (i in activeInputs.indices) { + val input = activeInputs[i] + val availableShare = availableCapacity / (size - i) + val grantedRate = min(input.allowedRate, availableShare) + + demand += input.limit + deadline = min(deadline, input.deadline) + availableCapacity -= grantedRate + + input.actualRate = grantedRate + } + + val rate = capacity - availableCapacity + + this.demand = demand + this.rate = rate + + // Divide the requests over the available capacity of the input resources fairly + for (i in activeOutputs.indices) { + val output = activeOutputs[i] + val inputCapacity = output.capacity + val fraction = inputCapacity / capacity + val grantedSpeed = rate * fraction + + output.push(grantedSpeed) + } + + return deadline } - val deltaS = delta / 1000.0 + /** + * The previous capacity of the multiplexer. + */ + private var _previousCapacity = 0.0 + + /** + * Update the counters of the scheduler. + */ + private fun updateCounters(delta: Long) { + val previousCapacity = _previousCapacity + _previousCapacity = capacity + + if (delta <= 0) { + return + } + + val deltaS = delta / 1000.0 - _counters.demand += _demand * deltaS - _counters.actual += _rate * deltaS - _counters.remaining += (previousCapacity - _rate) * deltaS + counters.demand += demand * deltaS + counters.actual += rate * deltaS + counters.remaining += (previousCapacity - rate) * deltaS + } } /** * An internal [FlowConsumer] implementation for multiplexer inputs. */ - private inner class Input(capacity: Double, val key: InterferenceKey?) : - AbstractFlowConsumer(engine, capacity), - FlowConsumerLogic, - Comparable<Input> { + private class Input( + engine: FlowEngine, + private val scheduler: Scheduler, + private val interferenceDomain: InterferenceDomain?, + @JvmField val key: InterferenceKey? + ) : AbstractFlowConsumer(engine, scheduler.capacity), FlowConsumerLogic, Comparable<Input> { /** * The requested limit. */ @@ -298,25 +468,39 @@ public class MaxMinFlowMultiplexer( @JvmField var actualRate: Double = 0.0 /** + * The deadline of the input. + */ + val deadline: Long + get() = ctx?.deadline ?: Long.MAX_VALUE + + /** * The processing rate that is allowed by the model constraints. */ val allowedRate: Double get() = min(capacity, limit) /** - * A flag to indicate that the input is closed. + * A flag to enable timers for the input. */ - private var _isClosed: Boolean = false + var enableTimers: Boolean = true + set(value) { + field = value + ctx?.enableTimers = value + } /** - * The timestamp at which we received the last command. + * A flag to control whether the input should converge. */ - private var _lastPull: Long = Long.MIN_VALUE + var shouldConsumerConverge: Boolean = true + set(value) { + field = value + ctx?.shouldConsumerConverge = value + } /** - * The interference domain this input belongs to. + * A flag to indicate that the input is closed. */ - private val interferenceDomain = this@MaxMinFlowMultiplexer.interferenceDomain + private var _isClosed: Boolean = false /** * Close the input. @@ -333,12 +517,7 @@ public class MaxMinFlowMultiplexer( override fun start(ctx: FlowConsumerContext) { check(!_isClosed) { "Cannot re-use closed input" } - - _activeInputs += this - if (parent != null) { - ctx.shouldConsumerConverge = true - } - + scheduler.registerInput(this) super.start(ctx) } @@ -353,21 +532,8 @@ public class MaxMinFlowMultiplexer( actualRate = 0.0 limit = rate - _lastPull = now - - runScheduler(now) - } - - override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { - val lastConverge = _lastConverge - val parent = parent - - if (parent != null && (lastConverge < now || _lastConvergeInput == null)) { - _lastConverge = now - _lastConvergeInput = this - parent.onConverge(now, max(0, now - lastConverge)) - } + scheduler.trigger(now) } override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { @@ -375,15 +541,15 @@ public class MaxMinFlowMultiplexer( limit = 0.0 actualRate = 0.0 - _lastPull = now - // Assign a new input responsible for handling the convergence events - if (_lastConvergeInput == this) { - _lastConvergeInput = null - } + scheduler.deregisterInput(this, now) - // Re-run scheduler to distribute new load - runScheduler(now) + // BUG: Cancel the connection so that `ctx` is set to `null` + cancel() + } + + override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { + scheduler.convergeInput(this, now) } /* Comparable */ @@ -392,11 +558,8 @@ public class MaxMinFlowMultiplexer( /** * Pull the source if necessary. */ - fun pull(now: Long) { - val ctx = ctx - if (ctx != null && _lastPull < now) { - ctx.flush() - } + fun pullSync() { + ctx?.pullSync() } /** @@ -409,7 +572,7 @@ public class MaxMinFlowMultiplexer( // Compute the performance penalty due to flow interference val perfScore = if (interferenceDomain != null) { - val load = _rate / _capacity + val load = scheduler.rate / scheduler.capacity interferenceDomain.apply(key, load) } else { 1.0 @@ -422,14 +585,14 @@ public class MaxMinFlowMultiplexer( updateCounters(demand, actual, remaining) - _counters.interference += actual * max(0.0, 1 - perfScore) + scheduler.counters.interference += actual * max(0.0, 1 - perfScore) } } /** * An internal [FlowSource] implementation for multiplexer outputs. */ - private inner class Output : FlowSource, Comparable<Output> { + private class Output(private val scheduler: Scheduler) : FlowSource, Comparable<Output> { /** * The active [FlowConnection] of this source. */ @@ -441,6 +604,22 @@ public class MaxMinFlowMultiplexer( @JvmField var capacity: Double = 0.0 /** + * A flag to indicate that this output is the activation output. + */ + var isActivationOutput: Boolean + get() = _isActivationOutput + set(value) { + _isActivationOutput = value + _conn?.shouldSourceConverge = value + } + private var _isActivationOutput: Boolean = false + + /** + * A flag to indicate that the output is active. + */ + @JvmField var isActive = false + + /** * Push the specified rate to the consumer. */ fun push(rate: Double) { @@ -454,35 +633,56 @@ public class MaxMinFlowMultiplexer( _conn?.close() } + /** + * Pull this output. + */ + fun pull() { + _conn?.pull() + } + override fun onStart(conn: FlowConnection, now: Long) { assert(_conn == null) { "Source running concurrently" } _conn = conn capacity = conn.capacity - _activeOutputs.add(this) + isActive = true - updateCapacity() + scheduler.registerOutput(this) } override fun onStop(conn: FlowConnection, now: Long, delta: Long) { _conn = null capacity = 0.0 - _activeOutputs.remove(this) + isActive = false - updateCapacity() - - runScheduler(now) + scheduler.deregisterOutput(this, now) } override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { val capacity = capacity if (capacity != conn.capacity) { this.capacity = capacity - updateCapacity() + scheduler.updateCapacity() } - // Re-run scheduler to distribute new load - runScheduler(now) - return Long.MAX_VALUE + return if (_isActivationOutput) { + // If this output is the activation output, synchronously run the scheduler and return the new deadline + val deadline = scheduler.runScheduler(now) + if (deadline == Long.MAX_VALUE) + deadline + else + deadline - now + } else { + // Output is not the activation output, so trigger activation output and do not install timer for this + // output (by returning `Long.MAX_VALUE`) + scheduler.trigger(now) + Long.MAX_VALUE + } + } + + override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + if (_isActivationOutput) { + scheduler.convergeOutput(this, now) + } } override fun compareTo(other: Output): Int = capacity.compareTo(other.capacity) |
