From 5c4f9d936d7c08e8ad2705ed3dde5ea8dcd2ee64 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 4 Oct 2021 16:43:02 +0200 Subject: perf(simulator): Do not prune invocations on sync engine invocation --- .../simulator/flow/internal/FlowEngineImpl.kt | 112 +++++++++------------ 1 file changed, 49 insertions(+), 63 deletions(-) (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt index 019b5f10..a9234abf 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt @@ -38,7 +38,7 @@ import kotlin.coroutines.CoroutineContext * @param context The coroutine context to use. * @param clock The virtual simulation clock. */ -internal class FlowEngineImpl(private val context: CoroutineContext, override val clock: Clock) : FlowEngine { +internal class FlowEngineImpl(private val context: CoroutineContext, override val clock: Clock) : FlowEngine, Runnable { /** * The [Delay] instance that provides scheduled execution of [Runnable]s. */ @@ -82,7 +82,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va return } - runEngine(now) + doRunEngine(now) } /** @@ -100,7 +100,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va return } - runEngine(now) + doRunEngine(now) } override fun newContext(consumer: FlowSource, provider: FlowConsumerLogic): FlowConsumerContext = FlowConsumerContextImpl(this, consumer, provider) @@ -120,16 +120,13 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va } } - /** - * Run the engine and mark as active while running. - */ - private fun runEngine(now: Long) { - try { - batchIndex++ - doRunEngine(now) - } finally { - batchIndex-- - } + /* Runnable */ + override fun run() { + val now = clock.millis() + val invocation = futureInvocations.poll() // Clear invocation from future invocation queue + assert(now >= invocation.timestamp) { "Future invocations invariant violated" } + + doRunEngine(now) } /** @@ -141,44 +138,43 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va val futureInvocations = futureInvocations val visited = visited - // Remove any entries in the `futureInvocations` queue from the past - while (true) { - val head = futureInvocations.peek() - if (head == null || head.timestamp > now) { - break - } - futureInvocations.poll() - } - - // Execute all scheduled updates at current timestamp - while (true) { - val timer = futureQueue.peek() ?: break - val target = timer.target + try { + // Increment batch index so synchronous calls will not launch concurrent engine invocations + batchIndex++ - if (target > now) { - break - } + // Execute all scheduled updates at current timestamp + while (true) { + val timer = futureQueue.peek() ?: break + val target = timer.target - assert(target >= now) { "Internal inconsistency: found update of the past" } + if (target > now) { + break + } - futureQueue.poll() - timer.ctx.doUpdate(now, visited, futureQueue, isImmediate = false) - } + assert(target >= now) { "Internal inconsistency: found update of the past" } - // Repeat execution of all immediate updates until the system has converged to a steady-state - // We have to take into account that the onConverge callback can also trigger new actions. - do { - // Execute all immediate updates - while (true) { - val ctx = queue.poll() ?: break - ctx.doUpdate(now, visited, futureQueue, isImmediate = true) + futureQueue.poll() + timer.ctx.doUpdate(now, visited, futureQueue, isImmediate = false) } - while (true) { - val ctx = visited.poll() ?: break - ctx.onConverge(now) - } - } while (queue.isNotEmpty()) + // Repeat execution of all immediate updates until the system has converged to a steady-state + // We have to take into account that the onConverge callback can also trigger new actions. + do { + // Execute all immediate updates + while (true) { + val ctx = queue.poll() ?: break + ctx.doUpdate(now, visited, futureQueue, isImmediate = true) + } + + while (true) { + val ctx = visited.poll() ?: break + ctx.onConverge(now) + } + } while (queue.isNotEmpty()) + } finally { + // Decrement batch index to indicate no engine is active at the moment + batchIndex-- + } // Schedule an engine invocation for the next update to occur. val headTimer = futureQueue.peek() @@ -195,24 +191,14 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va * @param scheduled The queue of scheduled invocations. */ private fun trySchedule(now: Long, scheduled: ArrayDeque, target: Long) { - while (true) { - val invocation = scheduled.peekFirst() - if (invocation == null || invocation.timestamp > target) { - // Case 2: A new timer was registered ahead of the other timers. - // Solution: Schedule a new scheduler invocation - @OptIn(InternalCoroutinesApi::class) - val handle = delay.invokeOnTimeout(target - now, { runEngine(target) }, context) - scheduled.addFirst(Invocation(target, handle)) - break - } else if (invocation.timestamp < target) { - // Case 2: A timer was cancelled and the head of the timer queue is now later than excepted - // Solution: Cancel the next scheduler invocation - scheduled.pollFirst() - - invocation.cancel() - } else { - break - } + val head = scheduled.peek() + + // Only schedule a new scheduler invocation in case the target is earlier than all other pending + // scheduler invocations + if (head == null || target < head.timestamp) { + @OptIn(InternalCoroutinesApi::class) + val handle = delay.invokeOnTimeout(target - now, this, context) + scheduled.addFirst(Invocation(target, handle)) } } -- cgit v1.2.3 From 557797c63c19e80c908eccc96472f215eab0c2f3 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 4 Oct 2021 19:26:29 +0200 Subject: perf(experiments): Add benchmark for Capelin experiment --- .../org/opendc/simulator/flow/FlowBenchmarks.kt | 63 ++++++++++------------ 1 file changed, 28 insertions(+), 35 deletions(-) (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt index e927f81d..aabd2220 100644 --- a/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt +++ b/opendc-simulator/opendc-simulator-flow/src/jmh/kotlin/org/opendc/simulator/flow/FlowBenchmarks.kt @@ -24,7 +24,6 @@ package org.opendc.simulator.flow import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.launch -import org.opendc.simulator.core.SimulationCoroutineScope import org.opendc.simulator.core.runBlockingSimulation import org.opendc.simulator.flow.mux.ForwardingFlowMultiplexer import org.opendc.simulator.flow.mux.MaxMinFlowMultiplexer @@ -39,61 +38,53 @@ import java.util.concurrent.TimeUnit @Measurement(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) @OptIn(ExperimentalCoroutinesApi::class) class FlowBenchmarks { - private lateinit var scope: SimulationCoroutineScope - private lateinit var engine: FlowEngine + private lateinit var trace: Sequence @Setup fun setUp() { - scope = SimulationCoroutineScope() - engine = FlowEngine(scope.coroutineContext, scope.clock) - } - - @State(Scope.Thread) - class Workload { - lateinit var trace: Sequence - - @Setup - fun setUp() { - val random = ThreadLocalRandom.current() - val entries = List(10000) { TraceFlowSource.Fragment(1000, random.nextDouble(0.0, 4500.0)) } - trace = entries.asSequence() - } + val random = ThreadLocalRandom.current() + val entries = List(10000) { TraceFlowSource.Fragment(1000, random.nextDouble(0.0, 4500.0)) } + trace = entries.asSequence() } @Benchmark - fun benchmarkSink(state: Workload) { - return scope.runBlockingSimulation { + fun benchmarkSink() { + return runBlockingSimulation { + val engine = FlowEngine(coroutineContext, clock) val provider = FlowSink(engine, 4200.0) - return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) + return@runBlockingSimulation provider.consume(TraceFlowSource(trace)) } } @Benchmark - fun benchmarkForward(state: Workload) { - return scope.runBlockingSimulation { + fun benchmarkForward() { + return runBlockingSimulation { + val engine = FlowEngine(coroutineContext, clock) val provider = FlowSink(engine, 4200.0) val forwarder = FlowForwarder(engine) provider.startConsumer(forwarder) - return@runBlockingSimulation forwarder.consume(TraceFlowSource(state.trace)) + return@runBlockingSimulation forwarder.consume(TraceFlowSource(trace)) } } @Benchmark - fun benchmarkMuxMaxMinSingleSource(state: Workload) { - return scope.runBlockingSimulation { + fun benchmarkMuxMaxMinSingleSource() { + return runBlockingSimulation { + val engine = FlowEngine(coroutineContext, clock) val switch = MaxMinFlowMultiplexer(engine) FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) val provider = switch.newInput() - return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) + return@runBlockingSimulation provider.consume(TraceFlowSource(trace)) } } @Benchmark - fun benchmarkMuxMaxMinTripleSource(state: Workload) { - return scope.runBlockingSimulation { + fun benchmarkMuxMaxMinTripleSource() { + return runBlockingSimulation { + val engine = FlowEngine(coroutineContext, clock) val switch = MaxMinFlowMultiplexer(engine) FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) @@ -102,28 +93,30 @@ class FlowBenchmarks { repeat(3) { launch { val provider = switch.newInput() - provider.consume(TraceFlowSource(state.trace)) + provider.consume(TraceFlowSource(trace)) } } } } @Benchmark - fun benchmarkMuxExclusiveSingleSource(state: Workload) { - return scope.runBlockingSimulation { + fun benchmarkMuxExclusiveSingleSource() { + return runBlockingSimulation { + val engine = FlowEngine(coroutineContext, clock) val switch = ForwardingFlowMultiplexer(engine) FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) val provider = switch.newInput() - return@runBlockingSimulation provider.consume(TraceFlowSource(state.trace)) + return@runBlockingSimulation provider.consume(TraceFlowSource(trace)) } } @Benchmark - fun benchmarkMuxExclusiveTripleSource(state: Workload) { - return scope.runBlockingSimulation { + fun benchmarkMuxExclusiveTripleSource() { + return runBlockingSimulation { + val engine = FlowEngine(coroutineContext, clock) val switch = ForwardingFlowMultiplexer(engine) FlowSink(engine, 3000.0).startConsumer(switch.newOutput()) @@ -132,7 +125,7 @@ class FlowBenchmarks { repeat(2) { launch { val provider = switch.newInput() - provider.consume(TraceFlowSource(state.trace)) + provider.consume(TraceFlowSource(trace)) } } } -- cgit v1.2.3 From 7c260ab0b083488b8855f61648548a40401cf62e Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 4 Oct 2021 21:50:38 +0200 Subject: perf(simulator): Manage deadlines centrally in max min mux This change updates the MaxMinFlowMultiplexer implementation to centrally manage the deadlines of the `FlowSource`s as opposed to each source using its own timers. For large amounts of inputs, this is much faster as the multiplexer already needs to traverse each input on a timer expiration of an input. --- .../opendc/simulator/flow/FlowConsumerContext.kt | 14 ++- .../org/opendc/simulator/flow/internal/Flags.kt | 1 + .../flow/internal/FlowConsumerContextImpl.kt | 45 +++++-- .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 140 ++++++++++++++++----- 4 files changed, 152 insertions(+), 48 deletions(-) (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt index 15f9b93b..d7182497 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt @@ -28,6 +28,11 @@ package org.opendc.simulator.flow * This interface is used by [FlowConsumer]s to control the connection between it and the source. */ public interface FlowConsumerContext : FlowConnection { + /** + * The deadline of the source. + */ + public val deadline: Long + /** * The capacity of the connection. */ @@ -38,13 +43,18 @@ public interface FlowConsumerContext : FlowConnection { */ public var shouldConsumerConverge: Boolean + /** + * A flag to control whether the timers for the [FlowSource] should be enabled. + */ + public var enableTimers: Boolean + /** * Start the flow over the connection. */ public fun start() /** - * Synchronously flush the changes of the connection. + * Synchronously pull the source of the connection. */ - public fun flush() + public fun pullSync() } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt index 81ed9f26..939c5c98 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt @@ -41,3 +41,4 @@ internal const val ConnUpdateSkipped = 1 shl 6 // An update of the connection wa internal const val ConnConvergePending = 1 shl 7 // Indication that a convergence is already pending internal const val ConnConvergeSource = 1 shl 8 // Enable convergence of the source internal const val ConnConvergeConsumer = 1 shl 9 // Enable convergence of the consumer +internal const val ConnDisableTimers = 1 shl 10 // Disable timers for the source diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index 9d36483e..c7a8c3de 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -69,23 +69,30 @@ internal class FlowConsumerContextImpl( */ override val demand: Double get() = _demand + private var _demand: Double = 0.0 // The current (pending) demand of the source + + /** + * The deadline of the source. + */ + override val deadline: Long + get() = _deadline + private var _deadline: Long = Long.MAX_VALUE // The deadline of the source's timer /** * Flags to control the convergence of the consumer and source. */ - override var shouldSourceConverge: Boolean = false + override var shouldSourceConverge: Boolean + get() = _flags and ConnConvergeSource == ConnConvergeSource set(value) { - field = value _flags = if (value) _flags or ConnConvergeSource else _flags and ConnConvergeSource.inv() } - override var shouldConsumerConverge: Boolean = false + override var shouldConsumerConverge: Boolean + get() = _flags and ConnConvergeConsumer == ConnConvergeConsumer set(value) { - field = value - _flags = if (value) _flags or ConnConvergeConsumer @@ -94,15 +101,22 @@ internal class FlowConsumerContextImpl( } /** - * The clock to track simulation time. + * Flag to control the timers on the [FlowSource] */ - private val _clock = engine.clock + override var enableTimers: Boolean + get() = _flags and ConnDisableTimers != ConnDisableTimers + set(value) { + _flags = + if (!value) + _flags or ConnDisableTimers + else + _flags and ConnDisableTimers.inv() + } /** - * The current state of the connection. + * The clock to track simulation time. */ - private var _demand: Double = 0.0 // The current (pending) demand of the source - private var _deadline: Long = Long.MAX_VALUE // The deadline of the source's timer + private val _clock = engine.clock /** * The flags of the flow connection, indicating certain actions. @@ -166,7 +180,7 @@ internal class FlowConsumerContextImpl( scheduleImmediate(_clock.millis(), flags or ConnPulled) } - override fun flush() { + override fun pullSync() { val flags = _flags // Do not attempt to flush the connection if the connection is closed or an update is already active @@ -308,8 +322,13 @@ internal class FlowConsumerContextImpl( // Check whether we need to schedule a new timer for this connection. That is the case when: // (1) The deadline is valid (not the maximum value) // (2) The connection is active - // (3) The current active timer for the connection points to a later deadline - if (newDeadline == Long.MAX_VALUE || flags and ConnState != ConnActive || (timer != null && newDeadline >= timer.target)) { + // (3) Timers are not disabled for the source + // (4) The current active timer for the connection points to a later deadline + if (newDeadline == Long.MAX_VALUE || + flags and ConnState != ConnActive || + flags and ConnDisableTimers != 0 || + (timer != null && newDeadline >= timer.target) + ) { // Ignore any deadline scheduled at the maximum value // This indicates that the source does not want to register a timer return diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 5ff0fb8d..22f6516d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -97,6 +97,11 @@ public class MaxMinFlowMultiplexer( private var _lastConverge: Long = Long.MIN_VALUE private var _lastConvergeInput: Input? = null + /** + * An [Output] that is used to activate the scheduler. + */ + private var _activationOutput: Output? = null + override fun newInput(key: InterferenceKey?): FlowConsumer { val provider = Input(_capacity, key) _inputs.add(provider) @@ -146,19 +151,44 @@ public class MaxMinFlowMultiplexer( } /** - * Converge the scheduler of the multiplexer. + * Trigger the scheduler of the multiplexer. + * + * @param now The current virtual timestamp of the simulation. */ - private fun runScheduler(now: Long) { + private fun triggerScheduler(now: Long) { if (_schedulerActive) { + // No need to trigger the scheduler in case it is already active + return + } + + val activationOutput = _activationOutput + + // We can run the scheduler in two ways: + // (1) We can pull one of the multiplexer's outputs. This allows us to cascade multiple pushes by the input + // into a single scheduling cycle, but is slower in case of a few changes at the same timestamp. + // (2) We run the scheduler directly from this method call. This is the fastest approach when there are only + // a few inputs and little changes at the same timestamp. + // We always pick for option (1) unless there are no outputs available. + if (activationOutput != null) { + activationOutput.pull() return + } else { + runScheduler(now) } + } + + /** + * Synchronously run the scheduler of the multiplexer. + */ + private fun runScheduler(now: Long): Long { val lastSchedulerCycle = _lastSchedulerCycle - val delta = max(0, now - lastSchedulerCycle) - _schedulerActive = true _lastSchedulerCycle = now - try { - doSchedule(now, delta) + val delta = max(0, now - lastSchedulerCycle) + + return try { + _schedulerActive = true + doSchedule(delta) } finally { _schedulerActive = false } @@ -166,8 +196,10 @@ public class MaxMinFlowMultiplexer( /** * Schedule the inputs over the outputs. + * + * @return The deadline after which a new scheduling cycle should start. */ - private fun doSchedule(now: Long, delta: Long) { + private fun doSchedule(delta: Long): Long { val activeInputs = _activeInputs val activeOutputs = _activeOutputs @@ -178,7 +210,7 @@ public class MaxMinFlowMultiplexer( if (activeInputs.isEmpty()) { _demand = 0.0 _rate = 0.0 - return + return Long.MAX_VALUE } val capacity = _capacity @@ -187,7 +219,7 @@ public class MaxMinFlowMultiplexer( // Pull in the work of the outputs val inputIterator = activeInputs.listIterator() for (input in inputIterator) { - input.pull(now) + input.pullSync() // Remove outputs that have finished if (!input.isActive) { @@ -197,6 +229,7 @@ public class MaxMinFlowMultiplexer( } var demand = 0.0 + var deadline = Long.MAX_VALUE // Sort in-place the inputs based on their pushed flow. // Profiling shows that it is faster than maintaining some kind of sorted set. @@ -209,15 +242,11 @@ public class MaxMinFlowMultiplexer( val availableShare = availableCapacity / remaining-- val grantedRate = min(input.allowedRate, availableShare) - // Ignore empty sources - if (grantedRate <= 0.0) { - input.actualRate = 0.0 - continue - } - - input.actualRate = grantedRate demand += input.limit + deadline = min(deadline, input.deadline) availableCapacity -= grantedRate + + input.actualRate = grantedRate } val rate = capacity - availableCapacity @@ -237,6 +266,8 @@ public class MaxMinFlowMultiplexer( output.push(grantedSpeed) } + + return deadline } /** @@ -280,6 +311,18 @@ public class MaxMinFlowMultiplexer( _counters.remaining += (previousCapacity - _rate) * deltaS } + /** + * Updates the output that is used for scheduler activation. + */ + private fun updateActivationOutput() { + val output = _activeOutputs.firstOrNull() + _activationOutput = output + + for (input in _activeInputs) { + input.enableTimers = output == null + } + } + /** * An internal [FlowConsumer] implementation for multiplexer inputs. */ @@ -304,14 +347,24 @@ public class MaxMinFlowMultiplexer( get() = min(capacity, limit) /** - * A flag to indicate that the input is closed. + * The deadline of the input. */ - private var _isClosed: Boolean = false + val deadline: Long + get() = ctx?.deadline ?: Long.MAX_VALUE + + /** + * A flag to enable timers for the input. + */ + var enableTimers: Boolean = true + set(value) { + field = value + ctx?.enableTimers = value + } /** - * The timestamp at which we received the last command. + * A flag to indicate that the input is closed. */ - private var _lastPull: Long = Long.MIN_VALUE + private var _isClosed: Boolean = false /** * The interference domain this input belongs to. @@ -335,11 +388,15 @@ public class MaxMinFlowMultiplexer( check(!_isClosed) { "Cannot re-use closed input" } _activeInputs += this + if (parent != null) { ctx.shouldConsumerConverge = true } + enableTimers = _activationOutput == null // Disable timers of the source if one of the output manages it super.start(ctx) + + triggerScheduler(engine.clock.millis()) } /* FlowConsumerLogic */ @@ -353,9 +410,8 @@ public class MaxMinFlowMultiplexer( actualRate = 0.0 limit = rate - _lastPull = now - runScheduler(now) + triggerScheduler(now) } override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { @@ -375,7 +431,6 @@ public class MaxMinFlowMultiplexer( limit = 0.0 actualRate = 0.0 - _lastPull = now // Assign a new input responsible for handling the convergence events if (_lastConvergeInput == this) { @@ -383,7 +438,10 @@ public class MaxMinFlowMultiplexer( } // Re-run scheduler to distribute new load - runScheduler(now) + triggerScheduler(now) + + // BUG: Cancel the connection so that `ctx` is set to `null` + cancel() } /* Comparable */ @@ -392,11 +450,8 @@ public class MaxMinFlowMultiplexer( /** * Pull the source if necessary. */ - fun pull(now: Long) { - val ctx = ctx - if (ctx != null && _lastPull < now) { - ctx.flush() - } + fun pullSync() { + ctx?.pullSync() } /** @@ -454,6 +509,13 @@ public class MaxMinFlowMultiplexer( _conn?.close() } + /** + * Pull this output. + */ + fun pull() { + _conn?.pull() + } + override fun onStart(conn: FlowConnection, now: Long) { assert(_conn == null) { "Source running concurrently" } _conn = conn @@ -461,6 +523,7 @@ public class MaxMinFlowMultiplexer( _activeOutputs.add(this) updateCapacity() + updateActivationOutput() } override fun onStop(conn: FlowConnection, now: Long, delta: Long) { @@ -469,8 +532,9 @@ public class MaxMinFlowMultiplexer( _activeOutputs.remove(this) updateCapacity() + updateActivationOutput() - runScheduler(now) + triggerScheduler(now) } override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { @@ -480,9 +544,19 @@ public class MaxMinFlowMultiplexer( updateCapacity() } - // Re-run scheduler to distribute new load - runScheduler(now) - return Long.MAX_VALUE + return if (_activationOutput == this) { + // If this output is the activation output, synchronously run the scheduler and return the new deadline + val deadline = runScheduler(now) + if (deadline == Long.MAX_VALUE) + deadline + else + deadline - now + } else { + // Output is not the activation output, so trigger activation output and do not install timer for this + // output (by returning `Long.MAX_VALUE`) + triggerScheduler(now) + Long.MAX_VALUE + } } override fun compareTo(other: Output): Int = capacity.compareTo(other.capacity) -- cgit v1.2.3 From 0ec821157767a630ff82f980f4990ec9d1b75573 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 5 Oct 2021 11:05:23 +0200 Subject: refactor(simulator): Extract scheduler for max min multiplexer --- .../org/opendc/simulator/flow/internal/Flags.kt | 6 +- .../flow/internal/FlowConsumerContextImpl.kt | 70 +-- .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 554 +++++++++++++-------- 3 files changed, 383 insertions(+), 247 deletions(-) (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt index 939c5c98..97d56fff 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt @@ -35,9 +35,9 @@ internal const val ConnState = 0b11 // Mask for accessing the state of the flow */ internal const val ConnPulled = 1 shl 2 // The source should be pulled internal const val ConnPushed = 1 shl 3 // The source has pushed a value -internal const val ConnUpdateActive = 1 shl 4 // An update for the connection is active -internal const val ConnUpdatePending = 1 shl 5 // An (immediate) update of the connection is pending -internal const val ConnUpdateSkipped = 1 shl 6 // An update of the connection was not necessary +internal const val ConnClose = 1 shl 4 // The connection should be closed +internal const val ConnUpdateActive = 1 shl 5 // An update for the connection is active +internal const val ConnUpdatePending = 1 shl 6 // An (immediate) update of the connection is pending internal const val ConnConvergePending = 1 shl 7 // Indication that a convergence is already pending internal const val ConnConvergeSource = 1 shl 8 // Enable convergence of the source internal const val ConnConvergeConsumer = 1 shl 9 // Enable convergence of the consumer diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index c7a8c3de..f15d7fb0 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -150,23 +150,17 @@ internal class FlowConsumerContextImpl( } override fun close() { - var flags = _flags + val flags = _flags if (flags and ConnState == ConnClosed) { return } - engine.batch { - // Mark the connection as closed and pulled (in order to converge) - flags = (flags and ConnState.inv()) or ConnClosed or ConnPulled - _flags = flags - - if (flags and ConnUpdateActive == 0) { - val now = _clock.millis() - doStopSource(now) - - // FIX: Make sure the context converges - scheduleImmediate(now, flags) - } + // Toggle the close bit. In case no update is active, schedule a new update. + if (flags and ConnUpdateActive == 0) { + val now = _clock.millis() + scheduleImmediate(now, flags or ConnClose) + } else { + _flags = flags or ConnClose } } @@ -232,7 +226,7 @@ internal class FlowConsumerContextImpl( val deadline = _deadline val reachedDeadline = deadline == now - var newDeadline = deadline + var newDeadline: Long var hasUpdated = false try { @@ -259,9 +253,13 @@ internal class FlowConsumerContextImpl( deadline } + // Make the new deadline available for the consumer if it has changed + if (newDeadline != deadline) { + _deadline = newDeadline + } + // Push to the consumer if the rate of the source has changed (after a call to `push`) - val newState = flags and ConnState - if (newState == ConnActive && flags and ConnPushed != 0) { + if (flags and ConnPushed != 0) { val lastPush = _lastPush val delta = max(0, now - lastPush) @@ -274,17 +272,33 @@ internal class FlowConsumerContextImpl( // IMPORTANT: Re-fetch the flags after the callback might have changed those flags = _flags - } else if (newState == ConnClosed) { + } + + // Check whether the source or consumer have tried to close the connection + if (flags and ConnClose != 0) { hasUpdated = true // The source has called [FlowConnection.close], so clean up the connection doStopSource(now) + + // IMPORTANT: Re-fetch the flags after the callback might have changed those + // We now also mark the connection as closed + flags = (_flags and ConnState.inv()) or ConnClosed + + _demand = 0.0 + newDeadline = Long.MAX_VALUE } } catch (cause: Throwable) { + hasUpdated = true + + // Clean up the connection + doFailSource(now, cause) + // Mark the connection as closed flags = (flags and ConnState.inv()) or ConnClosed - doFailSource(now, cause) + _demand = 0.0 + newDeadline = Long.MAX_VALUE } // Check whether the connection needs to be added to the visited queue. This is the case when: @@ -316,9 +330,6 @@ internal class FlowConsumerContextImpl( _timer } - // Set the new deadline and schedule a delayed update for that deadline - _deadline = newDeadline - // Check whether we need to schedule a new timer for this connection. That is the case when: // (1) The deadline is valid (not the maximum value) // (2) The connection is active @@ -355,8 +366,8 @@ internal class FlowConsumerContextImpl( // The connection is converging now, so unset the convergence pending flag _flags = flags and ConnConvergePending.inv() - // Call the source converge callback if it has enabled convergence and the connection is active - if (flags and ConnState == ConnActive && flags and ConnConvergeSource != 0) { + // Call the source converge callback if it has enabled convergence + if (flags and ConnConvergeSource != 0) { val delta = max(0, now - _lastSourceConvergence) _lastSourceConvergence = now @@ -371,7 +382,13 @@ internal class FlowConsumerContextImpl( logic.onConverge(this, now, delta) } } catch (cause: Throwable) { + // Invoke the finish callbacks doFailSource(now, cause) + + // Mark the connection as closed + _flags = (_flags and ConnState.inv()) or ConnClosed + _demand = 0.0 + _deadline = Long.MAX_VALUE } } @@ -386,10 +403,6 @@ internal class FlowConsumerContextImpl( doFinishConsumer(now, null) } catch (cause: Throwable) { doFinishConsumer(now, cause) - return - } finally { - _deadline = Long.MAX_VALUE - _demand = 0.0 } } @@ -402,9 +415,6 @@ internal class FlowConsumerContextImpl( } catch (e: Throwable) { e.addSuppressed(cause) doFinishConsumer(now, e) - } finally { - _deadline = Long.MAX_VALUE - _demand = 0.0 } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 22f6516d..c6aa94e2 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -38,7 +38,7 @@ import kotlin.math.min */ public class MaxMinFlowMultiplexer( private val engine: FlowEngine, - private val parent: FlowConvergenceListener? = null, + parent: FlowConvergenceListener? = null, private val interferenceDomain: InterferenceDomain? = null ) : FlowMultiplexer { /** @@ -47,7 +47,6 @@ public class MaxMinFlowMultiplexer( override val inputs: Set get() = _inputs private val _inputs = mutableSetOf() - private val _activeInputs = mutableListOf() /** * The outputs of the multiplexer. @@ -55,55 +54,38 @@ public class MaxMinFlowMultiplexer( override val outputs: Set get() = _outputs private val _outputs = mutableSetOf() - private val _activeOutputs = mutableListOf() /** * The flow counters of this multiplexer. */ public override val counters: FlowCounters - get() = _counters - private val _counters = FlowCountersImpl() + get() = scheduler.counters /** * The actual processing rate of the multiplexer. */ public override val rate: Double - get() = _rate - private var _rate = 0.0 + get() = scheduler.rate /** * The demanded processing rate of the input. */ public override val demand: Double - get() = _demand - private var _demand = 0.0 + get() = scheduler.demand /** * The capacity of the outputs. */ public override val capacity: Double - get() = _capacity - private var _capacity = 0.0 + get() = scheduler.capacity /** - * Flag to indicate that the scheduler is active. + * The [Scheduler] instance of this multiplexer. */ - private var _schedulerActive = false - private var _lastSchedulerCycle = Long.MAX_VALUE - - /** - * The last convergence timestamp and the input. - */ - private var _lastConverge: Long = Long.MIN_VALUE - private var _lastConvergeInput: Input? = null - - /** - * An [Output] that is used to activate the scheduler. - */ - private var _activationOutput: Output? = null + private val scheduler = Scheduler(engine, parent) override fun newInput(key: InterferenceKey?): FlowConsumer { - val provider = Input(_capacity, key) + val provider = Input(engine, scheduler, interferenceDomain, key) _inputs.add(provider) return provider } @@ -117,7 +99,7 @@ public class MaxMinFlowMultiplexer( } override fun newOutput(): FlowSource { - val output = Output() + val output = Output(scheduler) _outputs.add(output) return output } @@ -151,185 +133,330 @@ public class MaxMinFlowMultiplexer( } /** - * Trigger the scheduler of the multiplexer. - * - * @param now The current virtual timestamp of the simulation. + * Helper class containing the scheduler state. */ - private fun triggerScheduler(now: Long) { - if (_schedulerActive) { - // No need to trigger the scheduler in case it is already active - return - } + private class Scheduler(private val engine: FlowEngine, private val parent: FlowConvergenceListener?) { + /** + * The flow counters of this scheduler. + */ + @JvmField val counters = FlowCountersImpl() - val activationOutput = _activationOutput + /** + * The flow rate of the multiplexer. + */ + @JvmField var rate = 0.0 - // We can run the scheduler in two ways: - // (1) We can pull one of the multiplexer's outputs. This allows us to cascade multiple pushes by the input - // into a single scheduling cycle, but is slower in case of a few changes at the same timestamp. - // (2) We run the scheduler directly from this method call. This is the fastest approach when there are only - // a few inputs and little changes at the same timestamp. - // We always pick for option (1) unless there are no outputs available. - if (activationOutput != null) { - activationOutput.pull() - return - } else { - runScheduler(now) - } - } + /** + * The demand for the multiplexer. + */ + @JvmField var demand = 0.0 - /** - * Synchronously run the scheduler of the multiplexer. - */ - private fun runScheduler(now: Long): Long { - val lastSchedulerCycle = _lastSchedulerCycle - _lastSchedulerCycle = now + /** + * The capacity of the multiplexer. + */ + @JvmField var capacity = 0.0 + + /** + * An [Output] that is used to activate the scheduler. + */ + @JvmField var activationOutput: Output? = null - val delta = max(0, now - lastSchedulerCycle) + /** + * The active inputs registered with the scheduler. + */ + private val _activeInputs = mutableListOf() - return try { - _schedulerActive = true - doSchedule(delta) - } finally { - _schedulerActive = false + /** + * The active outputs registered with the scheduler. + */ + private val _activeOutputs = mutableListOf() + + /** + * Flag to indicate that the scheduler is active. + */ + private var _schedulerActive = false + private var _lastSchedulerCycle = Long.MAX_VALUE + + /** + * The last convergence timestamp and the input. + */ + private var _lastConverge: Long = Long.MIN_VALUE + private var _lastConvergeInput: Input? = null + + /** + * Register the specified [input] to this scheduler. + */ + fun registerInput(input: Input) { + _activeInputs.add(input) + + val hasActivationOutput = activationOutput != null + + // Disable timers and convergence of the source if one of the output manages it + input.shouldConsumerConverge = !hasActivationOutput + input.enableTimers = !hasActivationOutput + input.capacity = capacity + trigger(engine.clock.millis()) } - } - /** - * Schedule the inputs over the outputs. - * - * @return The deadline after which a new scheduling cycle should start. - */ - private fun doSchedule(delta: Long): Long { - val activeInputs = _activeInputs - val activeOutputs = _activeOutputs - - // Update the counters of the scheduler - updateCounters(delta) - - // If there is no work yet, mark the inputs as idle. - if (activeInputs.isEmpty()) { - _demand = 0.0 - _rate = 0.0 - return Long.MAX_VALUE + /** + * De-register the specified [input] from this scheduler. + */ + fun deregisterInput(input: Input, now: Long) { + // Assign a new input responsible for handling the convergence events + if (_lastConvergeInput == input) { + _lastConvergeInput = null + } + + // Re-run scheduler to distribute new load + trigger(now) } - val capacity = _capacity - var availableCapacity = capacity + /** + * This method is invoked when one of the inputs converges. + */ + fun convergeInput(input: Input, now: Long) { - // Pull in the work of the outputs - val inputIterator = activeInputs.listIterator() - for (input in inputIterator) { - input.pullSync() + val lastConverge = _lastConverge + val lastConvergeInput = _lastConvergeInput + val parent = parent + + if (parent != null && (now > lastConverge || lastConvergeInput == null || lastConvergeInput == input)) { + _lastConverge = now + _lastConvergeInput = input - // Remove outputs that have finished - if (!input.isActive) { - input.actualRate = 0.0 - inputIterator.remove() + parent.onConverge(now, max(0, now - lastConverge)) } } - var demand = 0.0 - var deadline = Long.MAX_VALUE + /** + * Register the specified [output] to this scheduler. + */ + fun registerOutput(output: Output) { + _activeOutputs.add(output) - // Sort in-place the inputs based on their pushed flow. - // Profiling shows that it is faster than maintaining some kind of sorted set. - activeInputs.sort() + updateCapacity() + updateActivationOutput() + } - // Divide the available output capacity fairly over the inputs using max-min fair sharing - var remaining = activeInputs.size - for (i in activeInputs.indices) { - val input = activeInputs[i] - val availableShare = availableCapacity / remaining-- - val grantedRate = min(input.allowedRate, availableShare) + /** + * De-register the specified [output] from this scheduler. + */ + fun deregisterOutput(output: Output, now: Long) { + _activeOutputs.remove(output) + updateCapacity() - demand += input.limit - deadline = min(deadline, input.deadline) - availableCapacity -= grantedRate + trigger(now) + } + + /** + * This method is invoked when one of the outputs converges. + */ + fun convergeOutput(output: Output, now: Long) { + val lastConverge = _lastConverge + val parent = parent + + if (parent != null) { + _lastConverge = now + + parent.onConverge(now, max(0, now - lastConverge)) + } - input.actualRate = grantedRate + if (!output.isActive) { + output.isActivationOutput = false + updateActivationOutput() + } } - val rate = capacity - availableCapacity + /** + * Trigger the scheduler of the multiplexer. + * + * @param now The current virtual timestamp of the simulation. + */ + fun trigger(now: Long) { + if (_schedulerActive) { + // No need to trigger the scheduler in case it is already active + return + } - _demand = demand - _rate = rate + val activationOutput = activationOutput - // Sort all consumers by their capacity - activeOutputs.sort() + // We can run the scheduler in two ways: + // (1) We can pull one of the multiplexer's outputs. This allows us to cascade multiple pushes by the input + // into a single scheduling cycle, but is slower in case of a few changes at the same timestamp. + // (2) We run the scheduler directly from this method call. This is the fastest approach when there are only + // a few inputs and little changes at the same timestamp. + // We always pick for option (1) unless there are no outputs available. + if (activationOutput != null) { + activationOutput.pull() + return + } else { + runScheduler(now) + } + } + + /** + * Synchronously run the scheduler of the multiplexer. + */ + fun runScheduler(now: Long): Long { + val lastSchedulerCycle = _lastSchedulerCycle + _lastSchedulerCycle = now - // Divide the requests over the available capacity of the input resources fairly - for (i in activeOutputs.indices) { - val output = activeOutputs[i] - val inputCapacity = output.capacity - val fraction = inputCapacity / capacity - val grantedSpeed = rate * fraction + val delta = max(0, now - lastSchedulerCycle) - output.push(grantedSpeed) + return try { + _schedulerActive = true + doRunScheduler(delta) + } finally { + _schedulerActive = false + } } - return deadline - } + /** + * Recompute the capacity of the multiplexer. + */ + fun updateCapacity() { + val newCapacity = _activeOutputs.sumOf(Output::capacity) - /** - * Recompute the capacity of the multiplexer. - */ - private fun updateCapacity() { - val newCapacity = _activeOutputs.sumOf(Output::capacity) + // No-op if the capacity is unchanged + if (capacity == newCapacity) { + return + } - // No-op if the capacity is unchanged - if (_capacity == newCapacity) { - return + capacity = newCapacity + + for (input in _activeInputs) { + input.capacity = newCapacity + } } - _capacity = newCapacity + /** + * Updates the output that is used for scheduler activation. + */ + private fun updateActivationOutput() { + val output = _activeOutputs.firstOrNull() + activationOutput = output - for (input in _inputs) { - input.capacity = newCapacity + if (output != null) { + output.isActivationOutput = true + } + + val hasActivationOutput = output != null + + for (input in _activeInputs) { + input.shouldConsumerConverge = !hasActivationOutput + input.enableTimers = !hasActivationOutput + } } - } - /** - * The previous capacity of the multiplexer. - */ - private var _previousCapacity = 0.0 + /** + * Schedule the inputs over the outputs. + * + * @return The deadline after which a new scheduling cycle should start. + */ + private fun doRunScheduler(delta: Long): Long { + val activeInputs = _activeInputs + val activeOutputs = _activeOutputs + + // Update the counters of the scheduler + updateCounters(delta) + + // If there is no work yet, mark the inputs as idle. + if (activeInputs.isEmpty()) { + demand = 0.0 + rate = 0.0 + return Long.MAX_VALUE + } - /** - * Update the counters of the scheduler. - */ - private fun updateCounters(delta: Long) { - val previousCapacity = _previousCapacity - _previousCapacity = _capacity + val capacity = capacity + var availableCapacity = capacity + + // Pull in the work of the outputs + val inputIterator = activeInputs.listIterator() + for (input in inputIterator) { + input.pullSync() + + // Remove outputs that have finished + if (!input.isActive) { + input.actualRate = 0.0 + inputIterator.remove() + } + } - if (delta <= 0) { - return + var demand = 0.0 + var deadline = Long.MAX_VALUE + + // Sort in-place the inputs based on their pushed flow. + // Profiling shows that it is faster than maintaining some kind of sorted set. + activeInputs.sort() + + // Divide the available output capacity fairly over the inputs using max-min fair sharing + val size = activeInputs.size + for (i in activeInputs.indices) { + val input = activeInputs[i] + val availableShare = availableCapacity / (size - i) + val grantedRate = min(input.allowedRate, availableShare) + + demand += input.limit + deadline = min(deadline, input.deadline) + availableCapacity -= grantedRate + + input.actualRate = grantedRate + } + + val rate = capacity - availableCapacity + + this.demand = demand + this.rate = rate + + // Sort all consumers by their capacity + activeOutputs.sort() + + // Divide the requests over the available capacity of the input resources fairly + for (i in activeOutputs.indices) { + val output = activeOutputs[i] + val inputCapacity = output.capacity + val fraction = inputCapacity / capacity + val grantedSpeed = rate * fraction + + output.push(grantedSpeed) + } + + return deadline } - val deltaS = delta / 1000.0 + /** + * The previous capacity of the multiplexer. + */ + private var _previousCapacity = 0.0 - _counters.demand += _demand * deltaS - _counters.actual += _rate * deltaS - _counters.remaining += (previousCapacity - _rate) * deltaS - } + /** + * Update the counters of the scheduler. + */ + private fun updateCounters(delta: Long) { + val previousCapacity = _previousCapacity + _previousCapacity = capacity - /** - * Updates the output that is used for scheduler activation. - */ - private fun updateActivationOutput() { - val output = _activeOutputs.firstOrNull() - _activationOutput = output + if (delta <= 0) { + return + } + + val deltaS = delta / 1000.0 - for (input in _activeInputs) { - input.enableTimers = output == null + counters.demand += demand * deltaS + counters.actual += rate * deltaS + counters.remaining += (previousCapacity - rate) * deltaS } } /** * An internal [FlowConsumer] implementation for multiplexer inputs. */ - private inner class Input(capacity: Double, val key: InterferenceKey?) : - AbstractFlowConsumer(engine, capacity), - FlowConsumerLogic, - Comparable { + private class Input( + engine: FlowEngine, + private val scheduler: Scheduler, + private val interferenceDomain: InterferenceDomain?, + @JvmField val key: InterferenceKey? + ) : AbstractFlowConsumer(engine, scheduler.capacity), FlowConsumerLogic, Comparable { /** * The requested limit. */ @@ -340,18 +467,18 @@ public class MaxMinFlowMultiplexer( */ @JvmField var actualRate: Double = 0.0 - /** - * The processing rate that is allowed by the model constraints. - */ - val allowedRate: Double - get() = min(capacity, limit) - /** * The deadline of the input. */ val deadline: Long get() = ctx?.deadline ?: Long.MAX_VALUE + /** + * The processing rate that is allowed by the model constraints. + */ + val allowedRate: Double + get() = min(capacity, limit) + /** * A flag to enable timers for the input. */ @@ -362,14 +489,18 @@ public class MaxMinFlowMultiplexer( } /** - * A flag to indicate that the input is closed. + * A flag to control whether the input should converge. */ - private var _isClosed: Boolean = false + var shouldConsumerConverge: Boolean = true + set(value) { + field = value + ctx?.shouldConsumerConverge = value + } /** - * The interference domain this input belongs to. + * A flag to indicate that the input is closed. */ - private val interferenceDomain = this@MaxMinFlowMultiplexer.interferenceDomain + private var _isClosed: Boolean = false /** * Close the input. @@ -386,17 +517,8 @@ public class MaxMinFlowMultiplexer( override fun start(ctx: FlowConsumerContext) { check(!_isClosed) { "Cannot re-use closed input" } - - _activeInputs += this - - if (parent != null) { - ctx.shouldConsumerConverge = true - } - enableTimers = _activationOutput == null // Disable timers of the source if one of the output manages it - + scheduler.registerInput(this) super.start(ctx) - - triggerScheduler(engine.clock.millis()) } /* FlowConsumerLogic */ @@ -411,19 +533,7 @@ public class MaxMinFlowMultiplexer( actualRate = 0.0 limit = rate - triggerScheduler(now) - } - - override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { - val lastConverge = _lastConverge - val parent = parent - - if (parent != null && (lastConverge < now || _lastConvergeInput == null)) { - _lastConverge = now - _lastConvergeInput = this - - parent.onConverge(now, max(0, now - lastConverge)) - } + scheduler.trigger(now) } override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { @@ -432,18 +542,16 @@ public class MaxMinFlowMultiplexer( limit = 0.0 actualRate = 0.0 - // Assign a new input responsible for handling the convergence events - if (_lastConvergeInput == this) { - _lastConvergeInput = null - } - - // Re-run scheduler to distribute new load - triggerScheduler(now) + scheduler.deregisterInput(this, now) // BUG: Cancel the connection so that `ctx` is set to `null` cancel() } + override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { + scheduler.convergeInput(this, now) + } + /* Comparable */ override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate) @@ -464,7 +572,7 @@ public class MaxMinFlowMultiplexer( // Compute the performance penalty due to flow interference val perfScore = if (interferenceDomain != null) { - val load = _rate / _capacity + val load = scheduler.rate / scheduler.capacity interferenceDomain.apply(key, load) } else { 1.0 @@ -477,14 +585,14 @@ public class MaxMinFlowMultiplexer( updateCounters(demand, actual, remaining) - _counters.interference += actual * max(0.0, 1 - perfScore) + scheduler.counters.interference += actual * max(0.0, 1 - perfScore) } } /** * An internal [FlowSource] implementation for multiplexer outputs. */ - private inner class Output : FlowSource, Comparable { + private class Output(private val scheduler: Scheduler) : FlowSource, Comparable { /** * The active [FlowConnection] of this source. */ @@ -495,6 +603,22 @@ public class MaxMinFlowMultiplexer( */ @JvmField var capacity: Double = 0.0 + /** + * A flag to indicate that this output is the activation output. + */ + var isActivationOutput: Boolean + get() = _isActivationOutput + set(value) { + _isActivationOutput = value + _conn?.shouldSourceConverge = value + } + private var _isActivationOutput: Boolean = false + + /** + * A flag to indicate that the output is active. + */ + @JvmField var isActive = false + /** * Push the specified rate to the consumer. */ @@ -520,33 +644,29 @@ public class MaxMinFlowMultiplexer( assert(_conn == null) { "Source running concurrently" } _conn = conn capacity = conn.capacity - _activeOutputs.add(this) + isActive = true - updateCapacity() - updateActivationOutput() + scheduler.registerOutput(this) } override fun onStop(conn: FlowConnection, now: Long, delta: Long) { _conn = null capacity = 0.0 - _activeOutputs.remove(this) + isActive = false - updateCapacity() - updateActivationOutput() - - triggerScheduler(now) + scheduler.deregisterOutput(this, now) } override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { val capacity = capacity if (capacity != conn.capacity) { this.capacity = capacity - updateCapacity() + scheduler.updateCapacity() } - return if (_activationOutput == this) { + return if (_isActivationOutput) { // If this output is the activation output, synchronously run the scheduler and return the new deadline - val deadline = runScheduler(now) + val deadline = scheduler.runScheduler(now) if (deadline == Long.MAX_VALUE) deadline else @@ -554,11 +674,17 @@ public class MaxMinFlowMultiplexer( } else { // Output is not the activation output, so trigger activation output and do not install timer for this // output (by returning `Long.MAX_VALUE`) - triggerScheduler(now) + scheduler.trigger(now) Long.MAX_VALUE } } + override fun onConverge(conn: FlowConnection, now: Long, delta: Long) { + if (_isActivationOutput) { + scheduler.convergeOutput(this, now) + } + } + override fun compareTo(other: Output): Int = capacity.compareTo(other.capacity) } } -- cgit v1.2.3 From bb5da0b8c3f6cea938b0630048af737ee05913ce Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 5 Oct 2021 12:57:56 +0200 Subject: perf(simulator): Ignore sync pulls without changes --- .../org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index f15d7fb0..9a568897 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -182,7 +182,11 @@ internal class FlowConsumerContextImpl( return } - engine.scheduleSync(_clock.millis(), this) + val now = _clock.millis() + + if (flags and (ConnPulled or ConnPushed) != 0 || _deadline == now) { + engine.scheduleSync(now, this) + } } override fun push(rate: Double) { -- cgit v1.2.3 From a6eec366f2a171f112a94d4ed50fe2c6521792a5 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 5 Oct 2021 17:06:09 +0200 Subject: perf(simulator): Only sort outputs on capacity change This change removes the sorting step for the outputs in the scheduling procedure for the max min multiplexer. This step is only necessary when the capacity of one of the outputs changes, which does not happen often. --- .../kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index c6aa94e2..97059e93 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -327,6 +327,9 @@ public class MaxMinFlowMultiplexer( for (input in _activeInputs) { input.capacity = newCapacity } + + // Sort outputs by their capacity + _activeOutputs.sort() } /** @@ -408,9 +411,6 @@ public class MaxMinFlowMultiplexer( this.demand = demand this.rate = rate - // Sort all consumers by their capacity - activeOutputs.sort() - // Divide the requests over the available capacity of the input resources fairly for (i in activeOutputs.indices) { val output = activeOutputs[i] -- cgit v1.2.3