diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-10-05 13:22:40 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2022-10-05 14:03:01 +0200 |
| commit | c214a7fe0d46ecc23a71f9237b20281c0ca1c929 (patch) | |
| tree | c3f173a0f20510052fc369fe02fc899c28f5a8ad /opendc-simulator/opendc-simulator-core/src/main/kotlin | |
| parent | 44173c342d698441fbbcba4685c78f9bee40d138 (diff) | |
refactor(sim/core): Use SimulationScheduler in coroutine dispatcher
This change updates the implementation of `SimulationDispatcher` to use
a (possibly user-provided) `SimulationScheduler` for managing the
execution of the simulation and future tasks.
Diffstat (limited to 'opendc-simulator/opendc-simulator-core/src/main/kotlin')
| -rw-r--r-- | opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt | 167 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationBuilders.kt (renamed from opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationBuilders.kt) | 19 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationController.kt (renamed from opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationController.kt) | 8 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineDispatcher.kt | 94 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineScope.kt (renamed from opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineScope.kt) | 12 |
5 files changed, 121 insertions, 179 deletions
diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt deleted file mode 100644 index 908e902a..00000000 --- a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.core - -import kotlinx.coroutines.* -import java.lang.Runnable -import java.time.Clock -import java.time.Instant -import java.time.ZoneId -import java.util.* -import kotlin.coroutines.CoroutineContext - -/** - * A [CoroutineDispatcher] that performs both immediate execution of coroutines on the main thread and uses a virtual - * clock for time management. - */ -@OptIn(InternalCoroutinesApi::class) -public class SimulationCoroutineDispatcher : CoroutineDispatcher(), SimulationController, Delay { - /** - * Queue of ordered tasks to run. - */ - private val queue = PriorityQueue<TimedRunnable>() - - /** - * Global order counter. - */ - private var _counter = 0L - - /** - * The current virtual time of simulation - */ - private var _clock = SimClock() - - /** - * The virtual clock of this dispatcher. - */ - override val clock: Clock = ClockAdapter(_clock) - - override fun dispatch(context: CoroutineContext, block: Runnable) { - block.run() - } - - override fun dispatchYield(context: CoroutineContext, block: Runnable) { - post(block) - } - - @OptIn(ExperimentalCoroutinesApi::class) - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { - postDelayed(CancellableContinuationRunnable(continuation) { resumeUndispatched(Unit) }, timeMillis) - } - - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - val node = postDelayed(block, timeMillis) - return object : DisposableHandle { - override fun dispose() { - queue.remove(node) - } - } - } - - override fun toString(): String { - return "SimulationCoroutineDispatcher[time=${_clock.time}ms, queued=${queue.size}]" - } - - private fun post(block: Runnable) = - queue.add(TimedRunnable(block, _counter++)) - - private fun postDelayed(block: Runnable, delayTime: Long) = - TimedRunnable(block, _counter++, safePlus(_clock.time, delayTime)) - .also { - queue.add(it) - } - - private fun safePlus(currentTime: Long, delayTime: Long): Long { - check(delayTime >= 0) - val result = currentTime + delayTime - if (result < currentTime) return Long.MAX_VALUE // clamp on overflow - return result - } - - override fun advanceUntilIdle(): Long { - val queue = queue - val clock = _clock - val oldTime = clock.time - - while (true) { - val current = queue.poll() ?: break - - // If the scheduled time is 0 (immediate) use current virtual time - if (current.time != 0L) { - clock.time = current.time - } - - current.run() - } - - return clock.time - oldTime - } - - /** - * A helper class that holds the time of the simulation. - */ - private class SimClock(@JvmField var time: Long = 0) - - /** - * A helper class to expose a [Clock] instance for this dispatcher. - */ - private class ClockAdapter(private val clock: SimClock, private val zone: ZoneId = ZoneId.systemDefault()) : Clock() { - override fun getZone(): ZoneId = zone - - override fun withZone(zone: ZoneId): Clock = ClockAdapter(clock, zone) - - override fun instant(): Instant = Instant.ofEpochMilli(millis()) - - override fun millis(): Long = clock.time - - override fun toString(): String = "SimulationCoroutineDispatcher.ClockAdapter[time=${clock.time}]" - } - - /** - * This class exists to allow cleanup code to avoid throwing for cancelled continuations scheduled - * in the future. - */ - private class CancellableContinuationRunnable<T>( - @JvmField val continuation: CancellableContinuation<T>, - private val block: CancellableContinuation<T>.() -> Unit - ) : Runnable { - override fun run() = continuation.block() - } - - /** - * A Runnable for our event loop that represents a task to perform at a time. - */ - private class TimedRunnable( - @JvmField val runnable: Runnable, - private val count: Long = 0, - @JvmField val time: Long = 0 - ) : Comparable<TimedRunnable>, Runnable by runnable { - override fun compareTo(other: TimedRunnable) = if (time == other.time) { - count.compareTo(other.count) - } else { - time.compareTo(other.time) - } - - override fun toString() = "TimedRunnable[time=$time, run=$runnable]" - } -} diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationBuilders.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationBuilders.kt index 9b284c11..a291b4e2 100644 --- a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationBuilders.kt +++ b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationBuilders.kt @@ -20,9 +20,10 @@ * SOFTWARE. */ -package org.opendc.simulator.core +package org.opendc.simulator.kotlin import kotlinx.coroutines.* +import org.opendc.simulator.SimulationScheduler import kotlin.coroutines.ContinuationInterceptor import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext @@ -31,8 +32,12 @@ import kotlin.coroutines.EmptyCoroutineContext * Executes a [body] inside an immediate execution dispatcher. */ @OptIn(ExperimentalCoroutinesApi::class) -public fun runBlockingSimulation(context: CoroutineContext = EmptyCoroutineContext, body: suspend SimulationCoroutineScope.() -> Unit) { - val (safeContext, dispatcher) = context.checkArguments() +public fun runBlockingSimulation( + context: CoroutineContext = EmptyCoroutineContext, + scheduler: SimulationScheduler = SimulationScheduler(), + body: suspend SimulationCoroutineScope.() -> Unit +) { + val (safeContext, dispatcher) = context.checkArguments(scheduler) val startingJobs = safeContext.activeJobs() val scope = SimulationCoroutineScope(safeContext) val deferred = scope.async { @@ -52,18 +57,18 @@ public fun runBlockingSimulation(context: CoroutineContext = EmptyCoroutineConte * Convenience method for calling [runBlockingSimulation] on an existing [SimulationCoroutineScope]. */ public fun SimulationCoroutineScope.runBlockingSimulation(block: suspend SimulationCoroutineScope.() -> Unit): Unit = - runBlockingSimulation(coroutineContext, block) + runBlockingSimulation(coroutineContext, scheduler, block) /** * Convenience method for calling [runBlockingSimulation] on an existing [SimulationCoroutineDispatcher]. */ public fun SimulationCoroutineDispatcher.runBlockingSimulation(block: suspend SimulationCoroutineScope.() -> Unit): Unit = - runBlockingSimulation(this, block) + runBlockingSimulation(this, scheduler, block) -private fun CoroutineContext.checkArguments(): Pair<CoroutineContext, SimulationController> { +private fun CoroutineContext.checkArguments(scheduler: SimulationScheduler): Pair<CoroutineContext, SimulationController> { val dispatcher = get(ContinuationInterceptor).run { this?.let { require(this is SimulationController) { "Dispatcher must implement SimulationController: $this" } } - this ?: SimulationCoroutineDispatcher() + this ?: SimulationCoroutineDispatcher(scheduler) } val job = get(Job) ?: SupervisorJob() diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationController.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationController.kt index 2b670b91..f96b2326 100644 --- a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationController.kt +++ b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationController.kt @@ -20,9 +20,10 @@ * SOFTWARE. */ -package org.opendc.simulator.core +package org.opendc.simulator.kotlin import kotlinx.coroutines.CoroutineDispatcher +import org.opendc.simulator.SimulationScheduler import java.time.Clock /** @@ -35,6 +36,11 @@ public interface SimulationController { public val clock: Clock /** + * The [SimulationScheduler] driving the simulation. + */ + public val scheduler: SimulationScheduler + + /** * Immediately execute all pending tasks and advance the virtual clock-time to the last delay. * * If new tasks are scheduled due to advancing virtual time, they will be executed before `advanceUntilIdle` diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineDispatcher.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineDispatcher.kt new file mode 100644 index 00000000..21ad1a86 --- /dev/null +++ b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineDispatcher.kt @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.kotlin + +import kotlinx.coroutines.* +import org.opendc.simulator.SimulationScheduler +import java.lang.Runnable +import java.time.Clock +import java.util.* +import kotlin.coroutines.CoroutineContext + +/** + * A [CoroutineDispatcher] that performs both immediate execution of coroutines on the main thread and uses a virtual + * clock for time management. + * + * @param scheduler The [SimulationScheduler] used to manage the execution of future tasks. + */ +@OptIn(InternalCoroutinesApi::class) +public class SimulationCoroutineDispatcher( + override val scheduler: SimulationScheduler = SimulationScheduler() +) : CoroutineDispatcher(), SimulationController, Delay { + /** + * The virtual clock of this dispatcher. + */ + override val clock: Clock = scheduler.clock + + override fun dispatch(context: CoroutineContext, block: Runnable) { + block.run() + } + + override fun dispatchYield(context: CoroutineContext, block: Runnable) { + scheduler.execute(block) + } + + @OptIn(ExperimentalCoroutinesApi::class) + override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { + scheduler.schedule(timeMillis, CancellableContinuationRunnable(continuation) { resumeUndispatched(Unit) }) + } + + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { + return object : DisposableHandle { + private val deadline = (scheduler.currentTime + timeMillis).let { if (it >= 0) it else Long.MAX_VALUE } + private val id = scheduler.schedule(timeMillis, block) + + override fun dispose() { + scheduler.cancel(deadline, id) + } + } + } + + override fun toString(): String { + return "SimulationCoroutineDispatcher[time=${scheduler.currentTime}ms]" + } + + override fun advanceUntilIdle(): Long { + val scheduler = scheduler + val oldTime = scheduler.currentTime + + scheduler.advanceUntilIdle() + + return scheduler.currentTime - oldTime + } + + /** + * This class exists to allow cleanup code to avoid throwing for cancelled continuations scheduled + * in the future. + */ + private class CancellableContinuationRunnable<T>( + @JvmField val continuation: CancellableContinuation<T>, + private val block: CancellableContinuation<T>.() -> Unit + ) : Runnable { + override fun run() = continuation.block() + } +} diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineScope.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineScope.kt index 1da7f0fa..6be8e67a 100644 --- a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineScope.kt +++ b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/kotlin/SimulationCoroutineScope.kt @@ -20,16 +20,17 @@ * SOFTWARE. */ -package org.opendc.simulator.core +package org.opendc.simulator.kotlin import kotlinx.coroutines.CoroutineExceptionHandler import kotlinx.coroutines.CoroutineScope +import org.opendc.simulator.SimulationScheduler import kotlin.coroutines.ContinuationInterceptor import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext /** - * A scope which provides detailed control over the execution of coroutines for simulations. + * A scope which provides detailed control over the execution of coroutines for simulations. */ public interface SimulationCoroutineScope : CoroutineScope, SimulationController @@ -46,9 +47,12 @@ private class SimulationCoroutineScopeImpl( * scope adds [SimulationCoroutineDispatcher] automatically. */ @Suppress("FunctionName") -public fun SimulationCoroutineScope(context: CoroutineContext = EmptyCoroutineContext): SimulationCoroutineScope { +public fun SimulationCoroutineScope( + context: CoroutineContext = EmptyCoroutineContext, + scheduler: SimulationScheduler = SimulationScheduler() +): SimulationCoroutineScope { var safeContext = context - if (context[ContinuationInterceptor] == null) safeContext += SimulationCoroutineDispatcher() + if (context[ContinuationInterceptor] == null) safeContext += SimulationCoroutineDispatcher(scheduler) return SimulationCoroutineScopeImpl(safeContext) } |
