summaryrefslogtreecommitdiff
path: root/simulator/opendc-utils/src/main/kotlin
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-04-07 21:59:13 +0200
committerGitHub <noreply@github.com>2021-04-07 21:59:13 +0200
commit5d3b759b18fb0a4278b43dea6a9db478b07804a5 (patch)
tree419dedda10f6a1f1865fbee4d1f546dd8876c940 /simulator/opendc-utils/src/main/kotlin
parent519141f9af525a853b40eb821e70ca209bc104bf (diff)
parent3d707674ddfa96ae5c090a7c918350b0bef9b50f (diff)
simulator: Optimize bottlenecks in resource layer
This pull request addresses several bottlenecks that were present in the `opendc-simulator-resources` layer and `TimerScheduler`. These changes result into a 4x performance improvement for the energy experiments we are currently doing. * The use of `StateFlow` has been removed where possible. Profiling shows that emitting changes to `StateFlow` becomes a bottleneck in a single-thread context. * `SimSpeedConsumerAdapter` is an alternative for obtaining the changes in speed of a resource. **Breaking API Changes** * `SimResourceSource` does not expose `speed` as `StateFlow` anymore. To monitor speed changes, use `SimSpeedConsumerAdapter`. * Power draw in `SimBareMetalMachine` is not exposed as `StateFlow` anymore.
Diffstat (limited to 'simulator/opendc-utils/src/main/kotlin')
-rw-r--r--simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt88
1 files changed, 51 insertions, 37 deletions
diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
index d4bc7b5c..aa2f3367 100644
--- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
+++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
@@ -34,8 +34,8 @@ import kotlin.math.max
/**
* A TimerScheduler facilitates scheduled execution of future tasks.
*
- * @property context The [CoroutineContext] to run the tasks with.
- * @property clock The clock to keep track of the time.
+ * @param context The [CoroutineContext] to run the tasks with.
+ * @param clock The clock to keep track of the time.
*/
@OptIn(ExperimentalCoroutinesApi::class)
public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clock) : AutoCloseable {
@@ -60,45 +60,51 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo
private val channel = Channel<Long?>(Channel.CONFLATED)
/**
- * The scheduling job.
+ * A flag to indicate that the scheduler is active.
*/
- private val job = scope.launch {
- val timers = timers
- val queue = queue
- val clock = clock
- var next: Long? = channel.receive()
+ private var isActive = true
- while (true) {
- next = select {
- channel.onReceive { it }
+ init {
+ scope.launch {
+ val timers = timers
+ val queue = queue
+ val clock = clock
+ val job = requireNotNull(coroutineContext[Job])
+ val exceptionHandler = coroutineContext[CoroutineExceptionHandler]
+ var next: Long? = channel.receive()
- val delay = next?.let { max(0L, it - clock.millis()) } ?: return@select
+ while (true) {
+ next = select {
+ channel.onReceive { it }
- onTimeout(delay) {
- while (queue.isNotEmpty() && isActive) {
- val timer = queue.peek()
- val timestamp = clock.millis()
+ val delay = next?.let { max(0L, it - clock.millis()) } ?: return@select
- assert(timer.timestamp >= timestamp) { "Found task in the past" }
+ onTimeout(delay) {
+ while (queue.isNotEmpty() && job.isActive) {
+ val timer = queue.peek()
+ val timestamp = clock.millis()
- if (timer.timestamp > timestamp && !timer.isCancelled) {
- // Schedule a task for the next event to occur.
- return@onTimeout timer.timestamp
- }
+ assert(timer.timestamp >= timestamp) { "Found task in the past" }
+
+ if (timer.timestamp > timestamp && !timer.isCancelled) {
+ // Schedule a task for the next event to occur.
+ return@onTimeout timer.timestamp
+ }
- queue.poll()
+ queue.poll()
- if (!timer.isCancelled) {
- timers.remove(timer.key)
- try {
- timer()
- } catch (e: Throwable) {
- coroutineContext[CoroutineExceptionHandler]?.handleException(coroutineContext, e)
+ if (!timer.isCancelled) {
+ timers.remove(timer.key)
+ try {
+ timer()
+ } catch (e: Throwable) {
+ exceptionHandler?.handleException(coroutineContext, e)
+ }
}
}
- }
- null
+ null
+ }
}
}
}
@@ -108,6 +114,7 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo
* Stop the scheduler.
*/
override fun close() {
+ isActive = false
cancelAll()
scope.cancel()
}
@@ -121,7 +128,7 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo
* @param key The key of the timer to cancel.
*/
public fun cancel(key: T) {
- if (!job.isActive) {
+ if (!isActive) {
return
}
@@ -131,11 +138,14 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo
timer?.isCancelled = true
// Optimization: check whether we are the head of the queue
- if (queue.peek() == timer) {
+ val queue = queue
+ val channel = channel
+ val peek = queue.peek()
+ if (peek == timer) {
queue.poll()
if (queue.isNotEmpty()) {
- channel.sendBlocking(queue.peek().timestamp)
+ channel.sendBlocking(peek.timestamp)
} else {
channel.sendBlocking(null)
}
@@ -182,12 +192,14 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo
*/
public fun startSingleTimerTo(key: T, timestamp: Long, block: () -> Unit) {
val now = clock.millis()
+ val queue = queue
+ val channel = channel
require(timestamp >= now) { "Timestamp must be in the future" }
- check(job.isActive) { "Timer is stopped" }
+ check(isActive) { "Timer is stopped" }
timers.compute(key) { _, old ->
- if (old?.timestamp == timestamp) {
+ if (old != null && old.timestamp == timestamp) {
// Fast-path: timer for the same timestamp already exists
old
} else {
@@ -198,8 +210,9 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo
queue.add(timer)
// Check if we need to push the interruption forward
- if (queue.peek() == timer) {
- channel.sendBlocking(timer.timestamp)
+ // Note that we check by timer reference
+ if (queue.peek() === timer) {
+ channel.offer(timer.timestamp)
}
timer
@@ -214,6 +227,7 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo
/**
* A flag to indicate that the task has been cancelled.
*/
+ @JvmField
var isCancelled: Boolean = false
/**