summaryrefslogtreecommitdiff
path: root/simulator/opendc-utils/src
diff options
context:
space:
mode:
Diffstat (limited to 'simulator/opendc-utils/src')
-rw-r--r--simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt80
1 files changed, 45 insertions, 35 deletions
diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
index 175c2b51..aa2f3367 100644
--- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
+++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt
@@ -60,47 +60,51 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo
private val channel = Channel<Long?>(Channel.CONFLATED)
/**
- * The scheduling job.
+ * A flag to indicate that the scheduler is active.
*/
- private val job = scope.launch {
- val timers = timers
- val queue = queue
- val clock = clock
- val job = requireNotNull(coroutineContext[Job])
- val exceptionHandler = coroutineContext[CoroutineExceptionHandler]
- var next: Long? = channel.receive()
+ private var isActive = true
- while (true) {
- next = select {
- channel.onReceive { it }
+ init {
+ scope.launch {
+ val timers = timers
+ val queue = queue
+ val clock = clock
+ val job = requireNotNull(coroutineContext[Job])
+ val exceptionHandler = coroutineContext[CoroutineExceptionHandler]
+ var next: Long? = channel.receive()
- val delay = next?.let { max(0L, it - clock.millis()) } ?: return@select
+ while (true) {
+ next = select {
+ channel.onReceive { it }
- onTimeout(delay) {
- while (queue.isNotEmpty() && job.isActive) {
- val timer = queue.peek()
- val timestamp = clock.millis()
+ val delay = next?.let { max(0L, it - clock.millis()) } ?: return@select
- assert(timer.timestamp >= timestamp) { "Found task in the past" }
+ onTimeout(delay) {
+ while (queue.isNotEmpty() && job.isActive) {
+ val timer = queue.peek()
+ val timestamp = clock.millis()
- if (timer.timestamp > timestamp && !timer.isCancelled) {
- // Schedule a task for the next event to occur.
- return@onTimeout timer.timestamp
- }
+ assert(timer.timestamp >= timestamp) { "Found task in the past" }
+
+ if (timer.timestamp > timestamp && !timer.isCancelled) {
+ // Schedule a task for the next event to occur.
+ return@onTimeout timer.timestamp
+ }
- queue.poll()
+ queue.poll()
- if (!timer.isCancelled) {
- timers.remove(timer.key)
- try {
- timer()
- } catch (e: Throwable) {
- exceptionHandler?.handleException(coroutineContext, e)
+ if (!timer.isCancelled) {
+ timers.remove(timer.key)
+ try {
+ timer()
+ } catch (e: Throwable) {
+ exceptionHandler?.handleException(coroutineContext, e)
+ }
}
}
- }
- null
+ null
+ }
}
}
}
@@ -110,6 +114,7 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo
* Stop the scheduler.
*/
override fun close() {
+ isActive = false
cancelAll()
scope.cancel()
}
@@ -123,7 +128,7 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo
* @param key The key of the timer to cancel.
*/
public fun cancel(key: T) {
- if (!job.isActive) {
+ if (!isActive) {
return
}
@@ -134,6 +139,7 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo
// Optimization: check whether we are the head of the queue
val queue = queue
+ val channel = channel
val peek = queue.peek()
if (peek == timer) {
queue.poll()
@@ -186,12 +192,14 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo
*/
public fun startSingleTimerTo(key: T, timestamp: Long, block: () -> Unit) {
val now = clock.millis()
+ val queue = queue
+ val channel = channel
require(timestamp >= now) { "Timestamp must be in the future" }
- check(job.isActive) { "Timer is stopped" }
+ check(isActive) { "Timer is stopped" }
timers.compute(key) { _, old ->
- if (old?.timestamp == timestamp) {
+ if (old != null && old.timestamp == timestamp) {
// Fast-path: timer for the same timestamp already exists
old
} else {
@@ -202,8 +210,9 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo
queue.add(timer)
// Check if we need to push the interruption forward
- if (queue.peek() == timer) {
- channel.sendBlocking(timer.timestamp)
+ // Note that we check by timer reference
+ if (queue.peek() === timer) {
+ channel.offer(timer.timestamp)
}
timer
@@ -218,6 +227,7 @@ public class TimerScheduler<T>(context: CoroutineContext, private val clock: Clo
/**
* A flag to indicate that the task has been cancelled.
*/
+ @JvmField
var isCancelled: Boolean = false
/**