summaryrefslogtreecommitdiff
path: root/opendc-common/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2022-02-18 14:45:05 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2022-02-18 14:46:30 +0100
commit0711d4c1d6afb41ca31afbf8bd6253921d57eeb4 (patch)
tree17fc40c7e8b13bc42e4f9b49a935a67db408c1bd /opendc-common/src
parent52d35cd82905612f0ef9f7b7d88611300fb48ebe (diff)
perf(common): Optimize TimerScheduler
This change updates the TimerScheduler implementation to directly use the Delay object instead of running the timers inside a coroutine. Constructing the coroutine is more expensive, so we prefer running in a Runnable.
Diffstat (limited to 'opendc-common/src')
-rw-r--r--opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt239
-rw-r--r--opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt46
2 files changed, 134 insertions, 151 deletions
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt b/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt
index 86314411..bec2c9f1 100644
--- a/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt
+++ b/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt
@@ -23,12 +23,10 @@
package org.opendc.common.util
import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.selects.select
import java.time.Clock
import java.util.*
+import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.CoroutineContext
-import kotlin.math.max
/**
* A TimerScheduler facilitates scheduled execution of future tasks.
@@ -37,86 +35,83 @@ import kotlin.math.max
* @param clock The clock to keep track of the time.
*/
@OptIn(ExperimentalCoroutinesApi::class)
-public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clock) : AutoCloseable {
+public class TimerScheduler<T>(private val context: CoroutineContext, private val clock: Clock) {
/**
- * The scope in which the scheduler runs.
+ * The [Delay] instance that provides scheduled execution of [Runnable]s.
*/
- private val scope = CoroutineScope(context + Job())
+ @OptIn(InternalCoroutinesApi::class)
+ private val delay =
+ requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" }
+
+ /**
+ * The stack of the invocations to occur in the future.
+ */
+ private val invocations = ArrayDeque<Invocation>()
/**
* A priority queue containing the tasks to be scheduled in the future.
*/
- private val queue = PriorityQueue<Timer>()
+ private val queue = PriorityQueue<Timer<T>>()
/**
* A map that keeps track of the timers.
*/
- private val timers = mutableMapOf<T, Timer>()
+ private val timers = mutableMapOf<T, Timer<T>>()
/**
- * The channel to communicate with the scheduling job.
+ * Start a timer that will invoke the specified [block] after [delay].
+ *
+ * Each timer has a key and if a new timer with same key is started the previous is cancelled.
+ *
+ * @param key The key of the timer to start.
+ * @param delay The delay before invoking the block.
+ * @param block The block to invoke.
*/
- private val channel = Channel<Long?>(Channel.CONFLATED)
+ public fun startSingleTimer(key: T, delay: Long, block: () -> Unit) {
+ startSingleTimerTo(key, clock.millis() + delay, block)
+ }
/**
- * A flag to indicate that the scheduler is active.
+ * Start a timer that will invoke the specified [block] at [timestamp].
+ *
+ * Each timer has a key and if a new timer with same key is started the previous is cancelled.
+ *
+ * @param key The key of the timer to start.
+ * @param timestamp The timestamp at which to invoke the block.
+ * @param block The block to invoke.
*/
- private var isActive = true
-
- init {
- scope.launch {
- val timers = timers
- val queue = queue
- val clock = clock
- val job = requireNotNull(coroutineContext[Job])
- val exceptionHandler = coroutineContext[CoroutineExceptionHandler]
- var next: Long? = channel.receive()
-
- while (true) {
- next = select {
- channel.onReceive { it }
-
- val delay = next?.let { max(0L, it - clock.millis()) } ?: return@select
-
- onTimeout(delay) {
- while (queue.isNotEmpty() && job.isActive) {
- val timer = queue.peek()
- val timestamp = clock.millis()
-
- assert(timer.timestamp >= timestamp) { "Found task in the past" }
-
- if (timer.timestamp > timestamp && !timer.isCancelled) {
- // Schedule a task for the next event to occur.
- return@onTimeout timer.timestamp
- }
-
- queue.poll()
-
- if (!timer.isCancelled) {
- timers.remove(timer.key)
- try {
- timer()
- } catch (e: Throwable) {
- exceptionHandler?.handleException(coroutineContext, e)
- }
- }
- }
-
- null
- }
- }
+ public fun startSingleTimerTo(key: T, timestamp: Long, block: () -> Unit) {
+ val now = clock.millis()
+ val queue = queue
+ val invocations = invocations
+
+ require(timestamp >= now) { "Timestamp must be in the future" }
+
+ timers.compute(key) { _, old ->
+ if (old != null && old.timestamp == timestamp) {
+ // Fast-path: timer for the same timestamp already exists
+ old.block = block
+ old
+ } else {
+ // Slow-path: cancel old timer and replace it with new timer
+ val timer = Timer(key, timestamp, block)
+
+ old?.isCancelled = true
+ queue.add(timer)
+ trySchedule(now, invocations, timestamp)
+
+ timer
}
}
}
/**
- * Stop the scheduler.
+ * Check if a timer with a given key is active.
+ *
+ * @param key The key to check if active.
+ * @return `true` if the timer with the specified [key] is active, `false` otherwise.
*/
- override fun close() {
- isActive = false
- cancelAll()
- scope.cancel()
- }
+ public fun isTimerActive(key: T): Boolean = key in timers
/**
* Cancel a timer with a given key.
@@ -127,28 +122,10 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo
* @param key The key of the timer to cancel.
*/
public fun cancel(key: T) {
- if (!isActive) {
- return
- }
-
val timer = timers.remove(key)
// Mark the timer as cancelled
timer?.isCancelled = true
-
- // Optimization: check whether we are the head of the queue
- val queue = queue
- val channel = channel
- val peek = queue.peek()
- if (peek == timer) {
- queue.poll()
-
- if (queue.isNotEmpty()) {
- channel.trySend(peek.timestamp)
- } else {
- channel.trySend(null)
- }
- }
}
/**
@@ -157,64 +134,60 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo
public fun cancelAll() {
queue.clear()
timers.clear()
- }
- /**
- * Check if a timer with a given key is active.
- *
- * @param key The key to check if active.
- * @return `true` if the timer with the specified [key] is active, `false` otherwise.
- */
- public fun isTimerActive(key: T): Boolean = key in timers
+ // Cancel all pending invocations
+ for (invocation in invocations) {
+ invocation.cancel()
+ }
+ invocations.clear()
+ }
/**
- * Start a timer that will invoke the specified [block] after [delay].
- *
- * Each timer has a key and if a new timer with same key is started the previous is cancelled.
+ * Try to schedule an engine invocation at the specified [target].
*
- * @param key The key of the timer to start.
- * @param delay The delay before invoking the block.
- * @param block The block to invoke.
+ * @param now The current virtual timestamp.
+ * @param target The virtual timestamp at which the engine invocation should happen.
+ * @param scheduled The queue of scheduled invocations.
*/
- public fun startSingleTimer(key: T, delay: Long, block: () -> Unit) {
- startSingleTimerTo(key, clock.millis() + delay, block)
+ private fun trySchedule(now: Long, scheduled: ArrayDeque<Invocation>, target: Long) {
+ val head = scheduled.peek()
+
+ // Only schedule a new scheduler invocation in case the target is earlier than all other pending
+ // scheduler invocations
+ if (head == null || target < head.timestamp) {
+ @OptIn(InternalCoroutinesApi::class)
+ val handle = delay.invokeOnTimeout(target - now, ::doRunTimers, context)
+ scheduled.addFirst(Invocation(target, handle))
+ }
}
/**
- * Start a timer that will invoke the specified [block] at [timestamp].
- *
- * Each timer has a key and if a new timer with same key is started the previous is cancelled.
- *
- * @param key The key of the timer to start.
- * @param timestamp The timestamp at which to invoke the block.
- * @param block The block to invoke.
+ * This method is invoked when the earliest timer expires.
*/
- public fun startSingleTimerTo(key: T, timestamp: Long, block: () -> Unit) {
- val now = clock.millis()
- val queue = queue
- val channel = channel
+ private fun doRunTimers() {
+ val invocations = invocations
+ val invocation = checkNotNull(invocations.poll()) // Clear invocation from future invocation queue
+ val now = invocation.timestamp
- require(timestamp >= now) { "Timestamp must be in the future" }
- check(isActive) { "Timer is stopped" }
+ while (queue.isNotEmpty()) {
+ val timer = queue.peek()
- timers.compute(key) { _, old ->
- if (old != null && old.timestamp == timestamp) {
- // Fast-path: timer for the same timestamp already exists
- old
- } else {
- // Slow-path: cancel old timer and replace it with new timer
- val timer = Timer(key, timestamp, block)
+ val timestamp = timer.timestamp
+ val isCancelled = timer.isCancelled
- old?.isCancelled = true
- queue.add(timer)
+ assert(timestamp >= now) { "Found task in the past" }
- // Check if we need to push the interruption forward
- // Note that we check by timer reference
- if (queue.peek() === timer) {
- channel.trySend(timer.timestamp)
- }
+ if (timestamp > now && !isCancelled) {
+ // Schedule a task for the next event to occur.
+ trySchedule(now, invocations, timestamp)
+ break
+ }
- timer
+ queue.poll()
+
+ if (!isCancelled) {
+ timers.remove(timer.key)
+ timer()
}
}
}
@@ -222,7 +195,7 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo
/**
* A task that is scheduled to run in the future.
*/
- private inner class Timer(val key: T, val timestamp: Long, val block: () -> Unit) : Comparable<Timer> {
+ private class Timer<T>(val key: T, val timestamp: Long, var block: () -> Unit) : Comparable<Timer<T>> {
/**
* A flag to indicate that the task has been cancelled.
*/
@@ -234,6 +207,22 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo
*/
operator fun invoke(): Unit = block()
- override fun compareTo(other: Timer): Int = timestamp.compareTo(other.timestamp)
+ override fun compareTo(other: Timer<T>): Int = timestamp.compareTo(other.timestamp)
+ }
+
+ /**
+ * A future engine invocation.
+ *
+ * This class is used to keep track of the future engine invocations created using the [Delay] instance. In case
+ * the invocation is not needed anymore, it can be cancelled via [cancel].
+ */
+ private class Invocation(
+ @JvmField val timestamp: Long,
+ @JvmField val handle: DisposableHandle
+ ) {
+ /**
+ * Cancel the engine invocation.
+ */
+ fun cancel() = handle.dispose()
}
}
diff --git a/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt b/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt
index 89b0cbe9..01f61f92 100644
--- a/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt
+++ b/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt
@@ -27,6 +27,8 @@ import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.opendc.simulator.core.runBlockingSimulation
+import java.time.Clock
+import kotlin.coroutines.EmptyCoroutineContext
/**
* A test suite for the [TimerScheduler] class.
@@ -34,14 +36,21 @@ import org.opendc.simulator.core.runBlockingSimulation
@OptIn(ExperimentalCoroutinesApi::class)
internal class TimerSchedulerTest {
@Test
+ fun testEmptyContext() {
+ assertThrows<IllegalArgumentException> { TimerScheduler<Unit>(EmptyCoroutineContext, Clock.systemUTC()) }
+ }
+
+ @Test
fun testBasicTimer() {
runBlockingSimulation {
val scheduler = TimerScheduler<Int>(coroutineContext, clock)
scheduler.startSingleTimer(0, 1000) {
- scheduler.close()
assertEquals(1000, clock.millis())
}
+
+ assertTrue(scheduler.isTimerActive(0))
+ assertFalse(scheduler.isTimerActive(1))
}
}
@@ -51,7 +60,6 @@ internal class TimerSchedulerTest {
val scheduler = TimerScheduler<Int>(coroutineContext, clock)
scheduler.cancel(1)
- scheduler.close()
}
}
@@ -61,12 +69,11 @@ internal class TimerSchedulerTest {
val scheduler = TimerScheduler<Int>(coroutineContext, clock)
scheduler.startSingleTimer(0, 1000) {
- assertFalse(false)
+ fail()
}
scheduler.startSingleTimer(1, 100) {
scheduler.cancel(0)
- scheduler.close()
assertEquals(100, clock.millis())
}
@@ -78,15 +85,9 @@ internal class TimerSchedulerTest {
runBlockingSimulation {
val scheduler = TimerScheduler<Int>(coroutineContext, clock)
- scheduler.startSingleTimer(0, 1000) {
- assertFalse(false)
- }
-
- scheduler.startSingleTimer(1, 100) {
- assertFalse(false)
- }
-
- scheduler.close()
+ scheduler.startSingleTimer(0, 1000) { fail() }
+ scheduler.startSingleTimer(1, 100) { fail() }
+ scheduler.cancelAll()
}
}
@@ -95,12 +96,9 @@ internal class TimerSchedulerTest {
runBlockingSimulation {
val scheduler = TimerScheduler<Int>(coroutineContext, clock)
- scheduler.startSingleTimer(0, 1000) {
- assertFalse(false)
- }
+ scheduler.startSingleTimer(0, 1000) { fail() }
scheduler.startSingleTimer(0, 200) {
- scheduler.close()
assertEquals(200, clock.millis())
}
@@ -108,16 +106,14 @@ internal class TimerSchedulerTest {
}
@Test
- fun testStopped() {
+ fun testOverrideBlock() {
runBlockingSimulation {
val scheduler = TimerScheduler<Int>(coroutineContext, clock)
- scheduler.close()
+ scheduler.startSingleTimer(0, 1000) { fail() }
- assertThrows<IllegalStateException> {
- scheduler.startSingleTimer(1, 100) {
- assertFalse(false)
- }
+ scheduler.startSingleTimer(0, 1000) {
+ assertEquals(1000, clock.millis())
}
}
}
@@ -129,11 +125,9 @@ internal class TimerSchedulerTest {
assertThrows<IllegalArgumentException> {
scheduler.startSingleTimer(1, -1) {
- assertFalse(false)
+ fail()
}
}
-
- scheduler.close()
}
}
}