diff options
Diffstat (limited to 'simulator/opendc-utils/src')
| -rw-r--r-- | simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt | 80 |
1 files changed, 45 insertions, 35 deletions
diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt index 175c2b51..aa2f3367 100644 --- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt +++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt @@ -60,47 +60,51 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo private val channel = Channel<Long?>(Channel.CONFLATED) /** - * The scheduling job. + * A flag to indicate that the scheduler is active. */ - private val job = scope.launch { - val timers = timers - val queue = queue - val clock = clock - val job = requireNotNull(coroutineContext[Job]) - val exceptionHandler = coroutineContext[CoroutineExceptionHandler] - var next: Long? = channel.receive() + private var isActive = true - while (true) { - next = select { - channel.onReceive { it } + init { + scope.launch { + val timers = timers + val queue = queue + val clock = clock + val job = requireNotNull(coroutineContext[Job]) + val exceptionHandler = coroutineContext[CoroutineExceptionHandler] + var next: Long? = channel.receive() - val delay = next?.let { max(0L, it - clock.millis()) } ?: return@select + while (true) { + next = select { + channel.onReceive { it } - onTimeout(delay) { - while (queue.isNotEmpty() && job.isActive) { - val timer = queue.peek() - val timestamp = clock.millis() + val delay = next?.let { max(0L, it - clock.millis()) } ?: return@select - assert(timer.timestamp >= timestamp) { "Found task in the past" } + onTimeout(delay) { + while (queue.isNotEmpty() && job.isActive) { + val timer = queue.peek() + val timestamp = clock.millis() - if (timer.timestamp > timestamp && !timer.isCancelled) { - // Schedule a task for the next event to occur. - return@onTimeout timer.timestamp - } + assert(timer.timestamp >= timestamp) { "Found task in the past" } + + if (timer.timestamp > timestamp && !timer.isCancelled) { + // Schedule a task for the next event to occur. + return@onTimeout timer.timestamp + } - queue.poll() + queue.poll() - if (!timer.isCancelled) { - timers.remove(timer.key) - try { - timer() - } catch (e: Throwable) { - exceptionHandler?.handleException(coroutineContext, e) + if (!timer.isCancelled) { + timers.remove(timer.key) + try { + timer() + } catch (e: Throwable) { + exceptionHandler?.handleException(coroutineContext, e) + } } } - } - null + null + } } } } @@ -110,6 +114,7 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo * Stop the scheduler. */ override fun close() { + isActive = false cancelAll() scope.cancel() } @@ -123,7 +128,7 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo * @param key The key of the timer to cancel. */ public fun cancel(key: T) { - if (!job.isActive) { + if (!isActive) { return } @@ -134,6 +139,7 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo // Optimization: check whether we are the head of the queue val queue = queue + val channel = channel val peek = queue.peek() if (peek == timer) { queue.poll() @@ -186,12 +192,14 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo */ public fun startSingleTimerTo(key: T, timestamp: Long, block: () -> Unit) { val now = clock.millis() + val queue = queue + val channel = channel require(timestamp >= now) { "Timestamp must be in the future" } - check(job.isActive) { "Timer is stopped" } + check(isActive) { "Timer is stopped" } timers.compute(key) { _, old -> - if (old?.timestamp == timestamp) { + if (old != null && old.timestamp == timestamp) { // Fast-path: timer for the same timestamp already exists old } else { @@ -202,8 +210,9 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo queue.add(timer) // Check if we need to push the interruption forward - if (queue.peek() == timer) { - channel.sendBlocking(timer.timestamp) + // Note that we check by timer reference + if (queue.peek() === timer) { + channel.offer(timer.timestamp) } timer @@ -218,6 +227,7 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo /** * A flag to indicate that the task has been cancelled. */ + @JvmField var isCancelled: Boolean = false /** |
