From 95a0ed6911f136fb25bb76d6b6e010bf66b8ba5b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sat, 27 Mar 2021 12:52:47 +0100 Subject: utils: Cache variables in TimerScheduler's hot paths This change updates the TimerScheduler implementation to cache several variables in the hot paths of the implementation. --- .../src/main/kotlin/org/opendc/utils/TimerScheduler.kt | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) (limited to 'simulator/opendc-utils/src/main') diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt index d4bc7b5c..175c2b51 100644 --- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt +++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt @@ -34,8 +34,8 @@ import kotlin.math.max /** * A TimerScheduler facilitates scheduled execution of future tasks. * - * @property context The [CoroutineContext] to run the tasks with. - * @property clock The clock to keep track of the time. + * @param context The [CoroutineContext] to run the tasks with. + * @param clock The clock to keep track of the time. */ @OptIn(ExperimentalCoroutinesApi::class) public class TimerScheduler(context: CoroutineContext, private val clock: Clock) : AutoCloseable { @@ -66,6 +66,8 @@ public class TimerScheduler(context: CoroutineContext, private val clock: Clo val timers = timers val queue = queue val clock = clock + val job = requireNotNull(coroutineContext[Job]) + val exceptionHandler = coroutineContext[CoroutineExceptionHandler] var next: Long? = channel.receive() while (true) { @@ -75,7 +77,7 @@ public class TimerScheduler(context: CoroutineContext, private val clock: Clo val delay = next?.let { max(0L, it - clock.millis()) } ?: return@select onTimeout(delay) { - while (queue.isNotEmpty() && isActive) { + while (queue.isNotEmpty() && job.isActive) { val timer = queue.peek() val timestamp = clock.millis() @@ -93,7 +95,7 @@ public class TimerScheduler(context: CoroutineContext, private val clock: Clo try { timer() } catch (e: Throwable) { - coroutineContext[CoroutineExceptionHandler]?.handleException(coroutineContext, e) + exceptionHandler?.handleException(coroutineContext, e) } } } @@ -131,11 +133,13 @@ public class TimerScheduler(context: CoroutineContext, private val clock: Clo timer?.isCancelled = true // Optimization: check whether we are the head of the queue - if (queue.peek() == timer) { + val queue = queue + val peek = queue.peek() + if (peek == timer) { queue.poll() if (queue.isNotEmpty()) { - channel.sendBlocking(queue.peek().timestamp) + channel.sendBlocking(peek.timestamp) } else { channel.sendBlocking(null) } -- cgit v1.2.3 From 3d707674ddfa96ae5c090a7c918350b0bef9b50f Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 7 Apr 2021 21:20:42 +0200 Subject: utils: Prevent use of Intrinsics.areEqual in scheduler hot path This change changes the TimerScheduler implementation to prevent calling Intrinsics.areEqual in the hot path. Profiling shows that especially this call has a high overhead. --- .../main/kotlin/org/opendc/utils/TimerScheduler.kt | 80 ++++++++++++---------- 1 file changed, 45 insertions(+), 35 deletions(-) (limited to 'simulator/opendc-utils/src/main') diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt index 175c2b51..aa2f3367 100644 --- a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt +++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt @@ -60,47 +60,51 @@ public class TimerScheduler(context: CoroutineContext, private val clock: Clo private val channel = Channel(Channel.CONFLATED) /** - * The scheduling job. + * A flag to indicate that the scheduler is active. */ - private val job = scope.launch { - val timers = timers - val queue = queue - val clock = clock - val job = requireNotNull(coroutineContext[Job]) - val exceptionHandler = coroutineContext[CoroutineExceptionHandler] - var next: Long? = channel.receive() + private var isActive = true - while (true) { - next = select { - channel.onReceive { it } + init { + scope.launch { + val timers = timers + val queue = queue + val clock = clock + val job = requireNotNull(coroutineContext[Job]) + val exceptionHandler = coroutineContext[CoroutineExceptionHandler] + var next: Long? = channel.receive() - val delay = next?.let { max(0L, it - clock.millis()) } ?: return@select + while (true) { + next = select { + channel.onReceive { it } - onTimeout(delay) { - while (queue.isNotEmpty() && job.isActive) { - val timer = queue.peek() - val timestamp = clock.millis() + val delay = next?.let { max(0L, it - clock.millis()) } ?: return@select - assert(timer.timestamp >= timestamp) { "Found task in the past" } + onTimeout(delay) { + while (queue.isNotEmpty() && job.isActive) { + val timer = queue.peek() + val timestamp = clock.millis() - if (timer.timestamp > timestamp && !timer.isCancelled) { - // Schedule a task for the next event to occur. - return@onTimeout timer.timestamp - } + assert(timer.timestamp >= timestamp) { "Found task in the past" } + + if (timer.timestamp > timestamp && !timer.isCancelled) { + // Schedule a task for the next event to occur. + return@onTimeout timer.timestamp + } - queue.poll() + queue.poll() - if (!timer.isCancelled) { - timers.remove(timer.key) - try { - timer() - } catch (e: Throwable) { - exceptionHandler?.handleException(coroutineContext, e) + if (!timer.isCancelled) { + timers.remove(timer.key) + try { + timer() + } catch (e: Throwable) { + exceptionHandler?.handleException(coroutineContext, e) + } } } - } - null + null + } } } } @@ -110,6 +114,7 @@ public class TimerScheduler(context: CoroutineContext, private val clock: Clo * Stop the scheduler. */ override fun close() { + isActive = false cancelAll() scope.cancel() } @@ -123,7 +128,7 @@ public class TimerScheduler(context: CoroutineContext, private val clock: Clo * @param key The key of the timer to cancel. */ public fun cancel(key: T) { - if (!job.isActive) { + if (!isActive) { return } @@ -134,6 +139,7 @@ public class TimerScheduler(context: CoroutineContext, private val clock: Clo // Optimization: check whether we are the head of the queue val queue = queue + val channel = channel val peek = queue.peek() if (peek == timer) { queue.poll() @@ -186,12 +192,14 @@ public class TimerScheduler(context: CoroutineContext, private val clock: Clo */ public fun startSingleTimerTo(key: T, timestamp: Long, block: () -> Unit) { val now = clock.millis() + val queue = queue + val channel = channel require(timestamp >= now) { "Timestamp must be in the future" } - check(job.isActive) { "Timer is stopped" } + check(isActive) { "Timer is stopped" } timers.compute(key) { _, old -> - if (old?.timestamp == timestamp) { + if (old != null && old.timestamp == timestamp) { // Fast-path: timer for the same timestamp already exists old } else { @@ -202,8 +210,9 @@ public class TimerScheduler(context: CoroutineContext, private val clock: Clo queue.add(timer) // Check if we need to push the interruption forward - if (queue.peek() == timer) { - channel.sendBlocking(timer.timestamp) + // Note that we check by timer reference + if (queue.peek() === timer) { + channel.offer(timer.timestamp) } timer @@ -218,6 +227,7 @@ public class TimerScheduler(context: CoroutineContext, private val clock: Clo /** * A flag to indicate that the task has been cancelled. */ + @JvmField var isCancelled: Boolean = false /** -- cgit v1.2.3