summaryrefslogtreecommitdiff
path: root/opendc-simulator/opendc-simulator-flow/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-04 21:50:38 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-10-05 15:03:03 +0200
commit7c260ab0b083488b8855f61648548a40401cf62e (patch)
tree3a182c53b3bf554197f70df854152b95c1838102 /opendc-simulator/opendc-simulator-flow/src/main
parent557797c63c19e80c908eccc96472f215eab0c2f3 (diff)
perf(simulator): Manage deadlines centrally in max min mux
This change updates the MaxMinFlowMultiplexer implementation to centrally manage the deadlines of the `FlowSource`s as opposed to each source using its own timers. For large amounts of inputs, this is much faster as the multiplexer already needs to traverse each input on a timer expiration of an input.
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src/main')
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt14
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt1
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt45
-rw-r--r--opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt140
4 files changed, 152 insertions, 48 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
index 15f9b93b..d7182497 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt
@@ -29,6 +29,11 @@ package org.opendc.simulator.flow
*/
public interface FlowConsumerContext : FlowConnection {
/**
+ * The deadline of the source.
+ */
+ public val deadline: Long
+
+ /**
* The capacity of the connection.
*/
public override var capacity: Double
@@ -39,12 +44,17 @@ public interface FlowConsumerContext : FlowConnection {
public var shouldConsumerConverge: Boolean
/**
+ * A flag to control whether the timers for the [FlowSource] should be enabled.
+ */
+ public var enableTimers: Boolean
+
+ /**
* Start the flow over the connection.
*/
public fun start()
/**
- * Synchronously flush the changes of the connection.
+ * Synchronously pull the source of the connection.
*/
- public fun flush()
+ public fun pullSync()
}
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt
index 81ed9f26..939c5c98 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Flags.kt
@@ -41,3 +41,4 @@ internal const val ConnUpdateSkipped = 1 shl 6 // An update of the connection wa
internal const val ConnConvergePending = 1 shl 7 // Indication that a convergence is already pending
internal const val ConnConvergeSource = 1 shl 8 // Enable convergence of the source
internal const val ConnConvergeConsumer = 1 shl 9 // Enable convergence of the consumer
+internal const val ConnDisableTimers = 1 shl 10 // Disable timers for the source
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
index 9d36483e..c7a8c3de 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt
@@ -69,23 +69,30 @@ internal class FlowConsumerContextImpl(
*/
override val demand: Double
get() = _demand
+ private var _demand: Double = 0.0 // The current (pending) demand of the source
+
+ /**
+ * The deadline of the source.
+ */
+ override val deadline: Long
+ get() = _deadline
+ private var _deadline: Long = Long.MAX_VALUE // The deadline of the source's timer
/**
* Flags to control the convergence of the consumer and source.
*/
- override var shouldSourceConverge: Boolean = false
+ override var shouldSourceConverge: Boolean
+ get() = _flags and ConnConvergeSource == ConnConvergeSource
set(value) {
- field = value
_flags =
if (value)
_flags or ConnConvergeSource
else
_flags and ConnConvergeSource.inv()
}
- override var shouldConsumerConverge: Boolean = false
+ override var shouldConsumerConverge: Boolean
+ get() = _flags and ConnConvergeConsumer == ConnConvergeConsumer
set(value) {
- field = value
-
_flags =
if (value)
_flags or ConnConvergeConsumer
@@ -94,15 +101,22 @@ internal class FlowConsumerContextImpl(
}
/**
- * The clock to track simulation time.
+ * Flag to control the timers on the [FlowSource]
*/
- private val _clock = engine.clock
+ override var enableTimers: Boolean
+ get() = _flags and ConnDisableTimers != ConnDisableTimers
+ set(value) {
+ _flags =
+ if (!value)
+ _flags or ConnDisableTimers
+ else
+ _flags and ConnDisableTimers.inv()
+ }
/**
- * The current state of the connection.
+ * The clock to track simulation time.
*/
- private var _demand: Double = 0.0 // The current (pending) demand of the source
- private var _deadline: Long = Long.MAX_VALUE // The deadline of the source's timer
+ private val _clock = engine.clock
/**
* The flags of the flow connection, indicating certain actions.
@@ -166,7 +180,7 @@ internal class FlowConsumerContextImpl(
scheduleImmediate(_clock.millis(), flags or ConnPulled)
}
- override fun flush() {
+ override fun pullSync() {
val flags = _flags
// Do not attempt to flush the connection if the connection is closed or an update is already active
@@ -308,8 +322,13 @@ internal class FlowConsumerContextImpl(
// Check whether we need to schedule a new timer for this connection. That is the case when:
// (1) The deadline is valid (not the maximum value)
// (2) The connection is active
- // (3) The current active timer for the connection points to a later deadline
- if (newDeadline == Long.MAX_VALUE || flags and ConnState != ConnActive || (timer != null && newDeadline >= timer.target)) {
+ // (3) Timers are not disabled for the source
+ // (4) The current active timer for the connection points to a later deadline
+ if (newDeadline == Long.MAX_VALUE ||
+ flags and ConnState != ConnActive ||
+ flags and ConnDisableTimers != 0 ||
+ (timer != null && newDeadline >= timer.target)
+ ) {
// Ignore any deadline scheduled at the maximum value
// This indicates that the source does not want to register a timer
return
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
index 5ff0fb8d..22f6516d 100644
--- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
+++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt
@@ -97,6 +97,11 @@ public class MaxMinFlowMultiplexer(
private var _lastConverge: Long = Long.MIN_VALUE
private var _lastConvergeInput: Input? = null
+ /**
+ * An [Output] that is used to activate the scheduler.
+ */
+ private var _activationOutput: Output? = null
+
override fun newInput(key: InterferenceKey?): FlowConsumer {
val provider = Input(_capacity, key)
_inputs.add(provider)
@@ -146,19 +151,44 @@ public class MaxMinFlowMultiplexer(
}
/**
- * Converge the scheduler of the multiplexer.
+ * Trigger the scheduler of the multiplexer.
+ *
+ * @param now The current virtual timestamp of the simulation.
*/
- private fun runScheduler(now: Long) {
+ private fun triggerScheduler(now: Long) {
if (_schedulerActive) {
+ // No need to trigger the scheduler in case it is already active
+ return
+ }
+
+ val activationOutput = _activationOutput
+
+ // We can run the scheduler in two ways:
+ // (1) We can pull one of the multiplexer's outputs. This allows us to cascade multiple pushes by the input
+ // into a single scheduling cycle, but is slower in case of a few changes at the same timestamp.
+ // (2) We run the scheduler directly from this method call. This is the fastest approach when there are only
+ // a few inputs and little changes at the same timestamp.
+ // We always pick for option (1) unless there are no outputs available.
+ if (activationOutput != null) {
+ activationOutput.pull()
return
+ } else {
+ runScheduler(now)
}
+ }
+
+ /**
+ * Synchronously run the scheduler of the multiplexer.
+ */
+ private fun runScheduler(now: Long): Long {
val lastSchedulerCycle = _lastSchedulerCycle
- val delta = max(0, now - lastSchedulerCycle)
- _schedulerActive = true
_lastSchedulerCycle = now
- try {
- doSchedule(now, delta)
+ val delta = max(0, now - lastSchedulerCycle)
+
+ return try {
+ _schedulerActive = true
+ doSchedule(delta)
} finally {
_schedulerActive = false
}
@@ -166,8 +196,10 @@ public class MaxMinFlowMultiplexer(
/**
* Schedule the inputs over the outputs.
+ *
+ * @return The deadline after which a new scheduling cycle should start.
*/
- private fun doSchedule(now: Long, delta: Long) {
+ private fun doSchedule(delta: Long): Long {
val activeInputs = _activeInputs
val activeOutputs = _activeOutputs
@@ -178,7 +210,7 @@ public class MaxMinFlowMultiplexer(
if (activeInputs.isEmpty()) {
_demand = 0.0
_rate = 0.0
- return
+ return Long.MAX_VALUE
}
val capacity = _capacity
@@ -187,7 +219,7 @@ public class MaxMinFlowMultiplexer(
// Pull in the work of the outputs
val inputIterator = activeInputs.listIterator()
for (input in inputIterator) {
- input.pull(now)
+ input.pullSync()
// Remove outputs that have finished
if (!input.isActive) {
@@ -197,6 +229,7 @@ public class MaxMinFlowMultiplexer(
}
var demand = 0.0
+ var deadline = Long.MAX_VALUE
// Sort in-place the inputs based on their pushed flow.
// Profiling shows that it is faster than maintaining some kind of sorted set.
@@ -209,15 +242,11 @@ public class MaxMinFlowMultiplexer(
val availableShare = availableCapacity / remaining--
val grantedRate = min(input.allowedRate, availableShare)
- // Ignore empty sources
- if (grantedRate <= 0.0) {
- input.actualRate = 0.0
- continue
- }
-
- input.actualRate = grantedRate
demand += input.limit
+ deadline = min(deadline, input.deadline)
availableCapacity -= grantedRate
+
+ input.actualRate = grantedRate
}
val rate = capacity - availableCapacity
@@ -237,6 +266,8 @@ public class MaxMinFlowMultiplexer(
output.push(grantedSpeed)
}
+
+ return deadline
}
/**
@@ -281,6 +312,18 @@ public class MaxMinFlowMultiplexer(
}
/**
+ * Updates the output that is used for scheduler activation.
+ */
+ private fun updateActivationOutput() {
+ val output = _activeOutputs.firstOrNull()
+ _activationOutput = output
+
+ for (input in _activeInputs) {
+ input.enableTimers = output == null
+ }
+ }
+
+ /**
* An internal [FlowConsumer] implementation for multiplexer inputs.
*/
private inner class Input(capacity: Double, val key: InterferenceKey?) :
@@ -304,14 +347,24 @@ public class MaxMinFlowMultiplexer(
get() = min(capacity, limit)
/**
- * A flag to indicate that the input is closed.
+ * The deadline of the input.
*/
- private var _isClosed: Boolean = false
+ val deadline: Long
+ get() = ctx?.deadline ?: Long.MAX_VALUE
+
+ /**
+ * A flag to enable timers for the input.
+ */
+ var enableTimers: Boolean = true
+ set(value) {
+ field = value
+ ctx?.enableTimers = value
+ }
/**
- * The timestamp at which we received the last command.
+ * A flag to indicate that the input is closed.
*/
- private var _lastPull: Long = Long.MIN_VALUE
+ private var _isClosed: Boolean = false
/**
* The interference domain this input belongs to.
@@ -335,11 +388,15 @@ public class MaxMinFlowMultiplexer(
check(!_isClosed) { "Cannot re-use closed input" }
_activeInputs += this
+
if (parent != null) {
ctx.shouldConsumerConverge = true
}
+ enableTimers = _activationOutput == null // Disable timers of the source if one of the output manages it
super.start(ctx)
+
+ triggerScheduler(engine.clock.millis())
}
/* FlowConsumerLogic */
@@ -353,9 +410,8 @@ public class MaxMinFlowMultiplexer(
actualRate = 0.0
limit = rate
- _lastPull = now
- runScheduler(now)
+ triggerScheduler(now)
}
override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) {
@@ -375,7 +431,6 @@ public class MaxMinFlowMultiplexer(
limit = 0.0
actualRate = 0.0
- _lastPull = now
// Assign a new input responsible for handling the convergence events
if (_lastConvergeInput == this) {
@@ -383,7 +438,10 @@ public class MaxMinFlowMultiplexer(
}
// Re-run scheduler to distribute new load
- runScheduler(now)
+ triggerScheduler(now)
+
+ // BUG: Cancel the connection so that `ctx` is set to `null`
+ cancel()
}
/* Comparable */
@@ -392,11 +450,8 @@ public class MaxMinFlowMultiplexer(
/**
* Pull the source if necessary.
*/
- fun pull(now: Long) {
- val ctx = ctx
- if (ctx != null && _lastPull < now) {
- ctx.flush()
- }
+ fun pullSync() {
+ ctx?.pullSync()
}
/**
@@ -454,6 +509,13 @@ public class MaxMinFlowMultiplexer(
_conn?.close()
}
+ /**
+ * Pull this output.
+ */
+ fun pull() {
+ _conn?.pull()
+ }
+
override fun onStart(conn: FlowConnection, now: Long) {
assert(_conn == null) { "Source running concurrently" }
_conn = conn
@@ -461,6 +523,7 @@ public class MaxMinFlowMultiplexer(
_activeOutputs.add(this)
updateCapacity()
+ updateActivationOutput()
}
override fun onStop(conn: FlowConnection, now: Long, delta: Long) {
@@ -469,8 +532,9 @@ public class MaxMinFlowMultiplexer(
_activeOutputs.remove(this)
updateCapacity()
+ updateActivationOutput()
- runScheduler(now)
+ triggerScheduler(now)
}
override fun onPull(conn: FlowConnection, now: Long, delta: Long): Long {
@@ -480,9 +544,19 @@ public class MaxMinFlowMultiplexer(
updateCapacity()
}
- // Re-run scheduler to distribute new load
- runScheduler(now)
- return Long.MAX_VALUE
+ return if (_activationOutput == this) {
+ // If this output is the activation output, synchronously run the scheduler and return the new deadline
+ val deadline = runScheduler(now)
+ if (deadline == Long.MAX_VALUE)
+ deadline
+ else
+ deadline - now
+ } else {
+ // Output is not the activation output, so trigger activation output and do not install timer for this
+ // output (by returning `Long.MAX_VALUE`)
+ triggerScheduler(now)
+ Long.MAX_VALUE
+ }
}
override fun compareTo(other: Output): Int = capacity.compareTo(other.capacity)