diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-01-08 18:18:43 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-01-11 15:23:56 +0100 |
| commit | a71d4885efcf01850bc236d3e9f77ab3f44b48aa (patch) | |
| tree | 797c65e0e5a37b73820ba4ef5d377b4a5524cd5f /simulator/opendc-utils/src/main | |
| parent | 42e9a5b5b610f41a03e68f6fc781c54b9402925b (diff) | |
Convert to pull-based workload model
This change converts the low-level workload model to be pull-based. This
reduces the overhead that we experienced with our previous co-routine
based approach.
Diffstat (limited to 'simulator/opendc-utils/src/main')
| -rw-r--r-- | simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt | 209 |
1 files changed, 209 insertions, 0 deletions
diff --git a/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt new file mode 100644 index 00000000..ff116443 --- /dev/null +++ b/simulator/opendc-utils/src/main/kotlin/org/opendc/utils/TimerScheduler.kt @@ -0,0 +1,209 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.utils + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.sendBlocking +import kotlinx.coroutines.launch +import kotlinx.coroutines.selects.select +import java.time.Clock +import java.util.* +import kotlin.math.max + +/** + * A TimerScheduler facilitates scheduled execution of future tasks. + * + * @property coroutineScope The [CoroutineScope] to run the tasks in. + * @property clock The clock to keep track of the time. + */ +@OptIn(ExperimentalCoroutinesApi::class) +public class TimerScheduler<T>(private val coroutineScope: CoroutineScope, private val clock: Clock) : AutoCloseable { + /** + * A priority queue containing the tasks to be scheduled in the future. + */ + private val queue = PriorityQueue<Timer>() + + /** + * A map that keeps track of the timers. + */ + private val timers = mutableMapOf<T, Timer>() + + /** + * The channel to communicate with the + */ + private val channel = Channel<Long?>(Channel.CONFLATED) + + /** + * The scheduling job. + */ + private val job = coroutineScope.launch { + val queue = queue + var next: Long? = channel.receive() + + while (true) { + next = select { + channel.onReceive { it } + + val delay = next?.let { max(0L, it - clock.millis()) } ?: return@select + + onTimeout(delay) { + while (queue.isNotEmpty()) { + val timer = queue.peek() + val timestamp = clock.millis() + + assert(timer.timestamp >= timestamp) { "Found task in the past" } + + if (timer.timestamp > timestamp && !timer.isCancelled) { + // Schedule a task for the next event to occur. + return@onTimeout timer.timestamp + } + + queue.poll() + + if (!timer.isCancelled) { + timers.remove(timer.key) + timer() + } + } + + null + } + } + } + } + + /** + * Stop the scheduler. + */ + override fun close() { + cancelAll() + job.cancel() + } + + /** + * Cancel a timer with a given key. + * + * If canceling a timer that was already canceled, or key never was used to start + * a timer this operation will do nothing. + * + * @param key The key of the timer to cancel. + */ + public fun cancel(key: T) { + if (!job.isActive) { + return + } + + val timer = timers.remove(key) + + // Mark the timer as cancelled + timer?.isCancelled = true + + // Optimization: check whether we are the head of the queue + if (queue.peek() == timer) { + queue.poll() + + if (queue.isNotEmpty()) { + channel.sendBlocking(queue.peek().timestamp) + } else { + channel.sendBlocking(null) + } + } + } + + /** + * Cancel all timers. + */ + public fun cancelAll() { + queue.clear() + timers.clear() + } + + /** + * Check if a timer with a given key is active. + * + * @param key The key to check if active. + * @return `true` if the timer with the specified [key] is active, `false` otherwise. + */ + public fun isTimerActive(key: T): Boolean = key in timers + + /** + * Start a timer that will invoke the specified [block] after [delay]. + * + * Each timer has a key and if a new timer with same key is started the previous is cancelled. + * + * @param key The key of the timer to start. + * @param delay The delay before invoking the block. + * @param block The block to invoke. + */ + public fun startSingleTimer(key: T, delay: Long, block: () -> Unit) { + startSingleTimerTo(key, clock.millis() + delay, block) + } + + /** + * Start a timer that will invoke the specified [block] at [timestamp]. + * + * Each timer has a key and if a new timer with same key is started the previous is cancelled. + * + * @param key The key of the timer to start. + * @param timestamp The timestamp at which to invoke the block. + * @param block The block to invoke. + */ + public fun startSingleTimerTo(key: T, timestamp: Long, block: () -> Unit) { + val now = clock.millis() + + require(timestamp >= now) { "Timestamp must be in the future" } + check(job.isActive) { "Timer is stopped" } + + val timer = Timer(key, timestamp, block) + + timers.compute(key) { _, old -> + old?.isCancelled = true + timer + } + queue.add(timer) + + // Check if we need to push the interruption forward + if (queue.peek() == timer) { + channel.sendBlocking(timer.timestamp) + } + } + + /** + * A task that is scheduled to run in the future. + */ + private inner class Timer(val key: T, val timestamp: Long, val block: () -> Unit) : Comparable<Timer> { + /** + * A flag to indicate that the task has been cancelled. + */ + var isCancelled: Boolean = false + + /** + * Run the task. + */ + operator fun invoke(): Unit = block() + + override fun compareTo(other: Timer): Int = timestamp.compareTo(other.timestamp) + } +} |
