diff options
Diffstat (limited to 'opendc-common')
| -rw-r--r-- | opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt | 239 | ||||
| -rw-r--r-- | opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt | 46 |
2 files changed, 134 insertions, 151 deletions
diff --git a/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt b/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt index 86314411..bec2c9f1 100644 --- a/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt +++ b/opendc-common/src/main/kotlin/org/opendc/common/util/TimerScheduler.kt @@ -23,12 +23,10 @@ package org.opendc.common.util import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.selects.select import java.time.Clock import java.util.* +import kotlin.coroutines.ContinuationInterceptor import kotlin.coroutines.CoroutineContext -import kotlin.math.max /** * A TimerScheduler facilitates scheduled execution of future tasks. @@ -37,86 +35,83 @@ import kotlin.math.max * @param clock The clock to keep track of the time. */ @OptIn(ExperimentalCoroutinesApi::class) -public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clock) : AutoCloseable { +public class TimerScheduler<T>(private val context: CoroutineContext, private val clock: Clock) { /** - * The scope in which the scheduler runs. + * The [Delay] instance that provides scheduled execution of [Runnable]s. */ - private val scope = CoroutineScope(context + Job()) + @OptIn(InternalCoroutinesApi::class) + private val delay = + requireNotNull(context[ContinuationInterceptor] as? Delay) { "Invalid CoroutineDispatcher: no delay implementation" } + + /** + * The stack of the invocations to occur in the future. + */ + private val invocations = ArrayDeque<Invocation>() /** * A priority queue containing the tasks to be scheduled in the future. */ - private val queue = PriorityQueue<Timer>() + private val queue = PriorityQueue<Timer<T>>() /** * A map that keeps track of the timers. */ - private val timers = mutableMapOf<T, Timer>() + private val timers = mutableMapOf<T, Timer<T>>() /** - * The channel to communicate with the scheduling job. + * Start a timer that will invoke the specified [block] after [delay]. + * + * Each timer has a key and if a new timer with same key is started the previous is cancelled. + * + * @param key The key of the timer to start. + * @param delay The delay before invoking the block. + * @param block The block to invoke. */ - private val channel = Channel<Long?>(Channel.CONFLATED) + public fun startSingleTimer(key: T, delay: Long, block: () -> Unit) { + startSingleTimerTo(key, clock.millis() + delay, block) + } /** - * A flag to indicate that the scheduler is active. + * Start a timer that will invoke the specified [block] at [timestamp]. + * + * Each timer has a key and if a new timer with same key is started the previous is cancelled. + * + * @param key The key of the timer to start. + * @param timestamp The timestamp at which to invoke the block. + * @param block The block to invoke. */ - private var isActive = true - - init { - scope.launch { - val timers = timers - val queue = queue - val clock = clock - val job = requireNotNull(coroutineContext[Job]) - val exceptionHandler = coroutineContext[CoroutineExceptionHandler] - var next: Long? = channel.receive() - - while (true) { - next = select { - channel.onReceive { it } - - val delay = next?.let { max(0L, it - clock.millis()) } ?: return@select - - onTimeout(delay) { - while (queue.isNotEmpty() && job.isActive) { - val timer = queue.peek() - val timestamp = clock.millis() - - assert(timer.timestamp >= timestamp) { "Found task in the past" } - - if (timer.timestamp > timestamp && !timer.isCancelled) { - // Schedule a task for the next event to occur. - return@onTimeout timer.timestamp - } - - queue.poll() - - if (!timer.isCancelled) { - timers.remove(timer.key) - try { - timer() - } catch (e: Throwable) { - exceptionHandler?.handleException(coroutineContext, e) - } - } - } - - null - } - } + public fun startSingleTimerTo(key: T, timestamp: Long, block: () -> Unit) { + val now = clock.millis() + val queue = queue + val invocations = invocations + + require(timestamp >= now) { "Timestamp must be in the future" } + + timers.compute(key) { _, old -> + if (old != null && old.timestamp == timestamp) { + // Fast-path: timer for the same timestamp already exists + old.block = block + old + } else { + // Slow-path: cancel old timer and replace it with new timer + val timer = Timer(key, timestamp, block) + + old?.isCancelled = true + queue.add(timer) + trySchedule(now, invocations, timestamp) + + timer } } } /** - * Stop the scheduler. + * Check if a timer with a given key is active. + * + * @param key The key to check if active. + * @return `true` if the timer with the specified [key] is active, `false` otherwise. */ - override fun close() { - isActive = false - cancelAll() - scope.cancel() - } + public fun isTimerActive(key: T): Boolean = key in timers /** * Cancel a timer with a given key. @@ -127,28 +122,10 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo * @param key The key of the timer to cancel. */ public fun cancel(key: T) { - if (!isActive) { - return - } - val timer = timers.remove(key) // Mark the timer as cancelled timer?.isCancelled = true - - // Optimization: check whether we are the head of the queue - val queue = queue - val channel = channel - val peek = queue.peek() - if (peek == timer) { - queue.poll() - - if (queue.isNotEmpty()) { - channel.trySend(peek.timestamp) - } else { - channel.trySend(null) - } - } } /** @@ -157,64 +134,60 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo public fun cancelAll() { queue.clear() timers.clear() - } - /** - * Check if a timer with a given key is active. - * - * @param key The key to check if active. - * @return `true` if the timer with the specified [key] is active, `false` otherwise. - */ - public fun isTimerActive(key: T): Boolean = key in timers + // Cancel all pending invocations + for (invocation in invocations) { + invocation.cancel() + } + invocations.clear() + } /** - * Start a timer that will invoke the specified [block] after [delay]. - * - * Each timer has a key and if a new timer with same key is started the previous is cancelled. + * Try to schedule an engine invocation at the specified [target]. * - * @param key The key of the timer to start. - * @param delay The delay before invoking the block. - * @param block The block to invoke. + * @param now The current virtual timestamp. + * @param target The virtual timestamp at which the engine invocation should happen. + * @param scheduled The queue of scheduled invocations. */ - public fun startSingleTimer(key: T, delay: Long, block: () -> Unit) { - startSingleTimerTo(key, clock.millis() + delay, block) + private fun trySchedule(now: Long, scheduled: ArrayDeque<Invocation>, target: Long) { + val head = scheduled.peek() + + // Only schedule a new scheduler invocation in case the target is earlier than all other pending + // scheduler invocations + if (head == null || target < head.timestamp) { + @OptIn(InternalCoroutinesApi::class) + val handle = delay.invokeOnTimeout(target - now, ::doRunTimers, context) + scheduled.addFirst(Invocation(target, handle)) + } } /** - * Start a timer that will invoke the specified [block] at [timestamp]. - * - * Each timer has a key and if a new timer with same key is started the previous is cancelled. - * - * @param key The key of the timer to start. - * @param timestamp The timestamp at which to invoke the block. - * @param block The block to invoke. + * This method is invoked when the earliest timer expires. */ - public fun startSingleTimerTo(key: T, timestamp: Long, block: () -> Unit) { - val now = clock.millis() - val queue = queue - val channel = channel + private fun doRunTimers() { + val invocations = invocations + val invocation = checkNotNull(invocations.poll()) // Clear invocation from future invocation queue + val now = invocation.timestamp - require(timestamp >= now) { "Timestamp must be in the future" } - check(isActive) { "Timer is stopped" } + while (queue.isNotEmpty()) { + val timer = queue.peek() - timers.compute(key) { _, old -> - if (old != null && old.timestamp == timestamp) { - // Fast-path: timer for the same timestamp already exists - old - } else { - // Slow-path: cancel old timer and replace it with new timer - val timer = Timer(key, timestamp, block) + val timestamp = timer.timestamp + val isCancelled = timer.isCancelled - old?.isCancelled = true - queue.add(timer) + assert(timestamp >= now) { "Found task in the past" } - // Check if we need to push the interruption forward - // Note that we check by timer reference - if (queue.peek() === timer) { - channel.trySend(timer.timestamp) - } + if (timestamp > now && !isCancelled) { + // Schedule a task for the next event to occur. + trySchedule(now, invocations, timestamp) + break + } - timer + queue.poll() + + if (!isCancelled) { + timers.remove(timer.key) + timer() } } } @@ -222,7 +195,7 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo /** * A task that is scheduled to run in the future. */ - private inner class Timer(val key: T, val timestamp: Long, val block: () -> Unit) : Comparable<Timer> { + private class Timer<T>(val key: T, val timestamp: Long, var block: () -> Unit) : Comparable<Timer<T>> { /** * A flag to indicate that the task has been cancelled. */ @@ -234,6 +207,22 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo */ operator fun invoke(): Unit = block() - override fun compareTo(other: Timer): Int = timestamp.compareTo(other.timestamp) + override fun compareTo(other: Timer<T>): Int = timestamp.compareTo(other.timestamp) + } + + /** + * A future engine invocation. + * + * This class is used to keep track of the future engine invocations created using the [Delay] instance. In case + * the invocation is not needed anymore, it can be cancelled via [cancel]. + */ + private class Invocation( + @JvmField val timestamp: Long, + @JvmField val handle: DisposableHandle + ) { + /** + * Cancel the engine invocation. + */ + fun cancel() = handle.dispose() } } diff --git a/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt b/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt index 89b0cbe9..01f61f92 100644 --- a/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt +++ b/opendc-common/src/test/kotlin/org/opendc/common/util/TimerSchedulerTest.kt @@ -27,6 +27,8 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import org.opendc.simulator.core.runBlockingSimulation +import java.time.Clock +import kotlin.coroutines.EmptyCoroutineContext /** * A test suite for the [TimerScheduler] class. @@ -34,14 +36,21 @@ import org.opendc.simulator.core.runBlockingSimulation @OptIn(ExperimentalCoroutinesApi::class) internal class TimerSchedulerTest { @Test + fun testEmptyContext() { + assertThrows<IllegalArgumentException> { TimerScheduler<Unit>(EmptyCoroutineContext, Clock.systemUTC()) } + } + + @Test fun testBasicTimer() { runBlockingSimulation { val scheduler = TimerScheduler<Int>(coroutineContext, clock) scheduler.startSingleTimer(0, 1000) { - scheduler.close() assertEquals(1000, clock.millis()) } + + assertTrue(scheduler.isTimerActive(0)) + assertFalse(scheduler.isTimerActive(1)) } } @@ -51,7 +60,6 @@ internal class TimerSchedulerTest { val scheduler = TimerScheduler<Int>(coroutineContext, clock) scheduler.cancel(1) - scheduler.close() } } @@ -61,12 +69,11 @@ internal class TimerSchedulerTest { val scheduler = TimerScheduler<Int>(coroutineContext, clock) scheduler.startSingleTimer(0, 1000) { - assertFalse(false) + fail() } scheduler.startSingleTimer(1, 100) { scheduler.cancel(0) - scheduler.close() assertEquals(100, clock.millis()) } @@ -78,15 +85,9 @@ internal class TimerSchedulerTest { runBlockingSimulation { val scheduler = TimerScheduler<Int>(coroutineContext, clock) - scheduler.startSingleTimer(0, 1000) { - assertFalse(false) - } - - scheduler.startSingleTimer(1, 100) { - assertFalse(false) - } - - scheduler.close() + scheduler.startSingleTimer(0, 1000) { fail() } + scheduler.startSingleTimer(1, 100) { fail() } + scheduler.cancelAll() } } @@ -95,12 +96,9 @@ internal class TimerSchedulerTest { runBlockingSimulation { val scheduler = TimerScheduler<Int>(coroutineContext, clock) - scheduler.startSingleTimer(0, 1000) { - assertFalse(false) - } + scheduler.startSingleTimer(0, 1000) { fail() } scheduler.startSingleTimer(0, 200) { - scheduler.close() assertEquals(200, clock.millis()) } @@ -108,16 +106,14 @@ internal class TimerSchedulerTest { } @Test - fun testStopped() { + fun testOverrideBlock() { runBlockingSimulation { val scheduler = TimerScheduler<Int>(coroutineContext, clock) - scheduler.close() + scheduler.startSingleTimer(0, 1000) { fail() } - assertThrows<IllegalStateException> { - scheduler.startSingleTimer(1, 100) { - assertFalse(false) - } + scheduler.startSingleTimer(0, 1000) { + assertEquals(1000, clock.millis()) } } } @@ -129,11 +125,9 @@ internal class TimerSchedulerTest { assertThrows<IllegalArgumentException> { scheduler.startSingleTimer(1, -1) { - assertFalse(false) + fail() } } - - scheduler.close() } } } |
