diff options
5 files changed, 155 insertions, 51 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 9d540118..337d68bf 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -120,7 +120,7 @@ class CapelinIntegrationTest { { assertEquals(67006560, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } }, { assertEquals(3159377, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } }, { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } }, - { assertEquals(5.840207707767459E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }, + { assertEquals(5.840212485920686E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }, ) } @@ -164,7 +164,7 @@ class CapelinIntegrationTest { { assertEquals(9740289, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, { assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, { assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }, - { assertEquals(7.009945802750012E8, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } } + { assertEquals(7.0099453912813E8, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } } ) } @@ -213,7 +213,7 @@ class CapelinIntegrationTest { { assertEquals(6013515, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } }, { assertEquals(14724500, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } }, { assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } }, - { assertEquals(480866, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } + { assertEquals(481270, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } } ) } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt index 15f9b93b..d7182497 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt @@ -29,6 +29,11 @@ package org.opendc.simulator.flow */ public interface FlowConsumerContext : FlowConnection { /** + * The deadline of the source. + */ + public val deadline: Long + + /** * The capacity of the connection. */ public override var capacity: Double @@ -39,12 +44,17 @@ public interface FlowConsumerContext : FlowConnection { public var shouldConsumerConverge: Boolean /** + * A flag to control whether the timers for the [FlowSource] should be enabled. + */ + public var enableTimers: Boolean + + /** * Start the flow over the connection. */ public fun start() /** - * Synchronously flush the changes of the connection. + * Synchronously pull the source of the connection. */ - public fun flush() + public fun pullSync() } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt index 81ed9f26..939c5c98 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt @@ -41,3 +41,4 @@ internal const val ConnUpdateSkipped = 1 shl 6 // An update of the connection wa internal const val ConnConvergePending = 1 shl 7 // Indication that a convergence is already pending internal const val ConnConvergeSource = 1 shl 8 // Enable convergence of the source internal const val ConnConvergeConsumer = 1 shl 9 // Enable convergence of the consumer +internal const val ConnDisableTimers = 1 shl 10 // Disable timers for the source diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index 9d36483e..c7a8c3de 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -69,23 +69,30 @@ internal class FlowConsumerContextImpl( */ override val demand: Double get() = _demand + private var _demand: Double = 0.0 // The current (pending) demand of the source + + /** + * The deadline of the source. + */ + override val deadline: Long + get() = _deadline + private var _deadline: Long = Long.MAX_VALUE // The deadline of the source's timer /** * Flags to control the convergence of the consumer and source. */ - override var shouldSourceConverge: Boolean = false + override var shouldSourceConverge: Boolean + get() = _flags and ConnConvergeSource == ConnConvergeSource set(value) { - field = value _flags = if (value) _flags or ConnConvergeSource else _flags and ConnConvergeSource.inv() } - override var shouldConsumerConverge: Boolean = false + override var shouldConsumerConverge: Boolean + get() = _flags and ConnConvergeConsumer == ConnConvergeConsumer set(value) { - field = value - _flags = if (value) _flags or ConnConvergeConsumer @@ -94,15 +101,22 @@ internal class FlowConsumerContextImpl( } /** - * The clock to track simulation time. + * Flag to control the timers on the [FlowSource] */ - private val _clock = engine.clock + override var enableTimers: Boolean + get() = _flags and ConnDisableTimers != ConnDisableTimers + set(value) { + _flags = + if (!value) + _flags or ConnDisableTimers + else + _flags and ConnDisableTimers.inv() + } /** - * The current state of the connection. + * The clock to track simulation time. */ - private var _demand: Double = 0.0 // The current (pending) demand of the source - private var _deadline: Long = Long.MAX_VALUE // The deadline of the source's timer + private val _clock = engine.clock /** * The flags of the flow connection, indicating certain actions. @@ -166,7 +180,7 @@ internal class FlowConsumerContextImpl( scheduleImmediate(_clock.millis(), flags or ConnPulled) } - override fun flush() { + override fun pullSync() { val flags = _flags // Do not attempt to flush the connection if the connection is closed or an update is already active @@ -308,8 +322,13 @@ internal class FlowConsumerContextImpl( // Check whether we need to schedule a new timer for this connection. That is the case when: // (1) The deadline is valid (not the maximum value) // (2) The connection is active - // (3) The current active timer for the connection points to a later deadline - if (newDeadline == Long.MAX_VALUE || flags and ConnState != ConnActive || (timer != null && newDeadline >= timer.target)) { + // (3) Timers are not disabled for the source + // (4) The current active timer for the connection points to a later deadline + if (newDeadline == Long.MAX_VALUE || + flags and ConnState != ConnActive || + flags and ConnDisableTimers != 0 || + (timer != null && newDeadline >= timer.target) + ) { // Ignore any deadline scheduled at the maximum value // This indicates that the source does not want to register a timer return diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 5ff0fb8d..22f6516d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -97,6 +97,11 @@ public class MaxMinFlowMultiplexer( private var _lastConverge: Long = Long.MIN_VALUE private var _lastConvergeInput: Input? = null + /** + * An [Output] that is used to activate the scheduler. + */ + private var _activationOutput: Output? = null + override fun newInput(key: InterferenceKey?): FlowConsumer { val provider = Input(_capacity, key) _inputs.add(provider) @@ -146,19 +151,44 @@ public class MaxMinFlowMultiplexer( } /** - * Converge the scheduler of the multiplexer. + * Trigger the scheduler of the multiplexer. + * + * @param now The current virtual timestamp of the simulation. */ - private fun runScheduler(now: Long) { + private fun triggerScheduler(now: Long) { if (_schedulerActive) { + // No need to trigger the scheduler in case it is already active + return + } + + val activationOutput = _activationOutput + + // We can run the scheduler in two ways: + // (1) We can pull one of the multiplexer's outputs. This allows us to cascade multiple pushes by the input + // into a single scheduling cycle, but is slower in case of a few changes at the same timestamp. + // (2) We run the scheduler directly from this method call. This is the fastest approach when there are only + // a few inputs and little changes at the same timestamp. + // We always pick for option (1) unless there are no outputs available. + if (activationOutput != null) { + activationOutput.pull() return + } else { + runScheduler(now) } + } + + /** + * Synchronously run the scheduler of the multiplexer. + */ + private fun runScheduler(now: Long): Long { val lastSchedulerCycle = _lastSchedulerCycle - val delta = max(0, now - lastSchedulerCycle) - _schedulerActive = true _lastSchedulerCycle = now - try { - doSchedule(now, delta) + val delta = max(0, now - lastSchedulerCycle) + + return try { + _schedulerActive = true + doSchedule(delta) } finally { _schedulerActive = false } @@ -166,8 +196,10 @@ public class MaxMinFlowMultiplexer( /** * Schedule the inputs over the outputs. + * + * @return The deadline after which a new scheduling cycle should start. */ - private fun doSchedule(now: Long, delta: Long) { + private fun doSchedule(delta: Long): Long { val activeInputs = _activeInputs val activeOutputs = _activeOutputs @@ -178,7 +210,7 @@ public class MaxMinFlowMultiplexer( if (activeInputs.isEmpty()) { _demand = 0.0 _rate = 0.0 - return + return Long.MAX_VALUE } val capacity = _capacity @@ -187,7 +219,7 @@ public class MaxMinFlowMultiplexer( // Pull in the work of the outputs val inputIterator = activeInputs.listIterator() for (input in inputIterator) { - input.pull(now) + input.pullSync() // Remove outputs that have finished if (!input.isActive) { @@ -197,6 +229,7 @@ public class MaxMinFlowMultiplexer( } var demand = 0.0 + var deadline = Long.MAX_VALUE // Sort in-place the inputs based on their pushed flow. // Profiling shows that it is faster than maintaining some kind of sorted set. @@ -209,15 +242,11 @@ public class MaxMinFlowMultiplexer( val availableShare = availableCapacity / remaining-- val grantedRate = min(input.allowedRate, availableShare) - // Ignore empty sources - if (grantedRate <= 0.0) { - input.actualRate = 0.0 - continue - } - - input.actualRate = grantedRate demand += input.limit + deadline = min(deadline, input.deadline) availableCapacity -= grantedRate + + input.actualRate = grantedRate } val rate = capacity - availableCapacity @@ -237,6 +266,8 @@ public class MaxMinFlowMultiplexer( output.push(grantedSpeed) } + + return deadline } /** @@ -281,6 +312,18 @@ public class MaxMinFlowMultiplexer( } /** + * Updates the output that is used for scheduler activation. + */ + private fun updateActivationOutput() { + val output = _activeOutputs.firstOrNull() + _activationOutput = output + + for (input in _activeInputs) { + input.enableTimers = output == null + } + } + + /** * An internal [FlowConsumer] implementation for multiplexer inputs. */ private inner class Input(capacity: Double, val key: InterferenceKey?) : @@ -304,14 +347,24 @@ public class MaxMinFlowMultiplexer( get() = min(capacity, limit) /** - * A flag to indicate that the input is closed. + * The deadline of the input. */ - private var _isClosed: Boolean = false + val deadline: Long + get() = ctx?.deadline ?: Long.MAX_VALUE + + /** + * A flag to enable timers for the input. + */ + var enableTimers: Boolean = true + set(value) { + field = value + ctx?.enableTimers = value + } /** - * The timestamp at which we received the last command. + * A flag to indicate that the input is closed. */ - private var _lastPull: Long = Long.MIN_VALUE + private var _isClosed: Boolean = false /** * The interference domain this input belongs to. @@ -335,11 +388,15 @@ public class MaxMinFlowMultiplexer( check(!_isClosed) { "Cannot re-use closed input" } _activeInputs += this + if (parent != null) { ctx.shouldConsumerConverge = true } + enableTimers = _activationOutput == null // Disable timers of the source if one of the output manages it super.start(ctx) + + triggerScheduler(engine.clock.millis()) } /* FlowConsumerLogic */ @@ -353,9 +410,8 @@ public class MaxMinFlowMultiplexer( actualRate = 0.0 limit = rate - _lastPull = now - runScheduler(now) + triggerScheduler(now) } override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { @@ -375,7 +431,6 @@ public class MaxMinFlowMultiplexer( limit = 0.0 actualRate = 0.0 - _lastPull = now // Assign a new input responsible for handling the convergence events if (_lastConvergeInput == this) { @@ -383,7 +438,10 @@ public class MaxMinFlowMultiplexer( } // Re-run scheduler to distribute new load - runScheduler(now) + triggerScheduler(now) + + // BUG: Cancel the connection so that `ctx` is set to `null` + cancel() } /* Comparable */ @@ -392,11 +450,8 @@ public class MaxMinFlowMultiplexer( /** * Pull the source if necessary. */ - fun pull(now: Long) { - val ctx = ctx - if (ctx != null && _lastPull < now) { - ctx.flush() - } + fun pullSync() { + ctx?.pullSync() } /** @@ -454,6 +509,13 @@ public class MaxMinFlowMultiplexer( _conn?.close() } + /** + * Pull this output. + */ + fun pull() { + _conn?.pull() + } + override fun onStart(conn: FlowConnection, now: Long) { assert(_conn == null) { "Source running concurrently" } _conn = conn @@ -461,6 +523,7 @@ public class MaxMinFlowMultiplexer( _activeOutputs.add(this) updateCapacity() + updateActivationOutput() } override fun onStop(conn: FlowConnection, now: Long, delta: Long) { @@ -469,8 +532,9 @@ public class MaxMinFlowMultiplexer( _activeOutputs.remove(this) updateCapacity() + updateActivationOutput() - runScheduler(now) + triggerScheduler(now) } override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long { @@ -480,9 +544,19 @@ public class MaxMinFlowMultiplexer( updateCapacity() } - // Re-run scheduler to distribute new load - runScheduler(now) - return Long.MAX_VALUE + return if (_activationOutput == this) { + // If this output is the activation output, synchronously run the scheduler and return the new deadline + val deadline = runScheduler(now) + if (deadline == Long.MAX_VALUE) + deadline + else + deadline - now + } else { + // Output is not the activation output, so trigger activation output and do not install timer for this + // output (by returning `Long.MAX_VALUE`) + triggerScheduler(now) + Long.MAX_VALUE + } } override fun compareTo(other: Output): Int = capacity.compareTo(other.capacity) |
