diff options
Diffstat (limited to 'opendc-simulator/opendc-simulator-core/src')
4 files changed, 340 insertions, 0 deletions
diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationBuilders.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationBuilders.kt new file mode 100644 index 00000000..9b284c11 --- /dev/null +++ b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationBuilders.kt @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.core + +import kotlinx.coroutines.* +import kotlin.coroutines.ContinuationInterceptor +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext + +/** + * Executes a [body] inside an immediate execution dispatcher. + */ +@OptIn(ExperimentalCoroutinesApi::class) +public fun runBlockingSimulation(context: CoroutineContext = EmptyCoroutineContext, body: suspend SimulationCoroutineScope.() -> Unit) { + val (safeContext, dispatcher) = context.checkArguments() + val startingJobs = safeContext.activeJobs() + val scope = SimulationCoroutineScope(safeContext) + val deferred = scope.async { + body(scope) + } + dispatcher.advanceUntilIdle() + deferred.getCompletionExceptionOrNull()?.let { + throw it + } + val endingJobs = safeContext.activeJobs() + if ((endingJobs - startingJobs).isNotEmpty()) { + throw IllegalStateException("Test finished with active jobs: $endingJobs") + } +} + +/** + * Convenience method for calling [runBlockingSimulation] on an existing [SimulationCoroutineScope]. + */ +public fun SimulationCoroutineScope.runBlockingSimulation(block: suspend SimulationCoroutineScope.() -> Unit): Unit = + runBlockingSimulation(coroutineContext, block) + +/** + * Convenience method for calling [runBlockingSimulation] on an existing [SimulationCoroutineDispatcher]. + */ +public fun SimulationCoroutineDispatcher.runBlockingSimulation(block: suspend SimulationCoroutineScope.() -> Unit): Unit = + runBlockingSimulation(this, block) + +private fun CoroutineContext.checkArguments(): Pair<CoroutineContext, SimulationController> { + val dispatcher = get(ContinuationInterceptor).run { + this?.let { require(this is SimulationController) { "Dispatcher must implement SimulationController: $this" } } + this ?: SimulationCoroutineDispatcher() + } + + val job = get(Job) ?: SupervisorJob() + return Pair(this + dispatcher + job, dispatcher as SimulationController) +} + +private fun CoroutineContext.activeJobs(): Set<Job> { + return checkNotNull(this[Job]).children.filter { it.isActive }.toSet() +} diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationController.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationController.kt new file mode 100644 index 00000000..2b670b91 --- /dev/null +++ b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationController.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.core + +import kotlinx.coroutines.CoroutineDispatcher +import java.time.Clock + +/** + * Control the virtual clock of a [CoroutineDispatcher]. + */ +public interface SimulationController { + /** + * The current virtual clock as it is known to this Dispatcher. + */ + public val clock: Clock + + /** + * Immediately execute all pending tasks and advance the virtual clock-time to the last delay. + * + * If new tasks are scheduled due to advancing virtual time, they will be executed before `advanceUntilIdle` + * returns. + * + * @return the amount of delay-time that this Dispatcher's clock has been forwarded in milliseconds. + */ + public fun advanceUntilIdle(): Long +} diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt new file mode 100644 index 00000000..e2f7874c --- /dev/null +++ b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineDispatcher.kt @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.core + +import kotlinx.coroutines.* +import java.lang.Runnable +import java.time.Clock +import java.time.Instant +import java.time.ZoneId +import java.util.* +import kotlin.coroutines.CoroutineContext + +/** + * A [CoroutineDispatcher] that performs both immediate execution of coroutines on the main thread and uses a virtual + * clock for time management. + */ +@OptIn(InternalCoroutinesApi::class) +public class SimulationCoroutineDispatcher : CoroutineDispatcher(), SimulationController, Delay { + /** + * The virtual clock of this dispatcher. + */ + override val clock: Clock = VirtualClock() + + /** + * Queue of ordered tasks to run. + */ + private val queue = PriorityQueue<TimedRunnable>() + + /** + * Global order counter. + */ + private var _counter = 0L + + /** + * The current virtual time of simulation + */ + private var _time = 0L + + override fun dispatch(context: CoroutineContext, block: Runnable) { + block.run() + } + + override fun dispatchYield(context: CoroutineContext, block: Runnable) { + post(block) + } + + @OptIn(ExperimentalCoroutinesApi::class) + override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { + postDelayed(CancellableContinuationRunnable(continuation) { resumeUndispatched(Unit) }, timeMillis) + } + + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { + val node = postDelayed(block, timeMillis) + return object : DisposableHandle { + override fun dispose() { + queue.remove(node) + } + } + } + + override fun toString(): String { + return "SimulationCoroutineDispatcher[time=${_time}ms, queued=${queue.size}]" + } + + private fun post(block: Runnable) = + queue.add(TimedRunnable(block, _counter++)) + + private fun postDelayed(block: Runnable, delayTime: Long) = + TimedRunnable(block, _counter++, safePlus(_time, delayTime)) + .also { + queue.add(it) + } + + private fun safePlus(currentTime: Long, delayTime: Long): Long { + check(delayTime >= 0) + val result = currentTime + delayTime + if (result < currentTime) return Long.MAX_VALUE // clamp on overflow + return result + } + + override fun advanceUntilIdle(): Long { + val queue = queue + val oldTime = _time + while (queue.isNotEmpty()) { + val current = queue.poll() + + // If the scheduled time is 0 (immediate) use current virtual time + if (current.time != 0L) { + _time = current.time + } + + current.run() + } + + return _time - oldTime + } + + private inner class VirtualClock(private val zone: ZoneId = ZoneId.systemDefault()) : Clock() { + override fun getZone(): ZoneId = zone + + override fun withZone(zone: ZoneId): Clock = VirtualClock(zone) + + override fun instant(): Instant = Instant.ofEpochMilli(millis()) + + override fun millis(): Long = _time + + override fun toString(): String = "SimulationCoroutineDispatcher.VirtualClock[time=$_time]" + } + + /** + * This class exists to allow cleanup code to avoid throwing for cancelled continuations scheduled + * in the future. + */ + private class CancellableContinuationRunnable<T>( + @JvmField val continuation: CancellableContinuation<T>, + private val block: CancellableContinuation<T>.() -> Unit + ) : Runnable { + override fun run() = continuation.block() + } + + /** + * A Runnable for our event loop that represents a task to perform at a time. + */ + private class TimedRunnable( + @JvmField val runnable: Runnable, + private val count: Long = 0, + @JvmField val time: Long = 0 + ) : Comparable<TimedRunnable>, Runnable by runnable { + override fun compareTo(other: TimedRunnable) = if (time == other.time) { + count.compareTo(other.count) + } else { + time.compareTo(other.time) + } + + override fun toString() = "TimedRunnable[time=$time, run=$runnable]" + } +} diff --git a/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineScope.kt b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineScope.kt new file mode 100644 index 00000000..1da7f0fa --- /dev/null +++ b/opendc-simulator/opendc-simulator-core/src/main/kotlin/org/opendc/simulator/core/SimulationCoroutineScope.kt @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.core + +import kotlinx.coroutines.CoroutineExceptionHandler +import kotlinx.coroutines.CoroutineScope +import kotlin.coroutines.ContinuationInterceptor +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext + +/** + * A scope which provides detailed control over the execution of coroutines for simulations. + */ +public interface SimulationCoroutineScope : CoroutineScope, SimulationController + +private class SimulationCoroutineScopeImpl( + override val coroutineContext: CoroutineContext +) : + SimulationCoroutineScope, + SimulationController by coroutineContext.simulationController + +/** + * A scope which provides detailed control over the execution of coroutines for simulations. + * + * If the provided context does not provide a [ContinuationInterceptor] (Dispatcher) or [CoroutineExceptionHandler], the + * scope adds [SimulationCoroutineDispatcher] automatically. + */ +@Suppress("FunctionName") +public fun SimulationCoroutineScope(context: CoroutineContext = EmptyCoroutineContext): SimulationCoroutineScope { + var safeContext = context + if (context[ContinuationInterceptor] == null) safeContext += SimulationCoroutineDispatcher() + return SimulationCoroutineScopeImpl(safeContext) +} + +private inline val CoroutineContext.simulationController: SimulationController + get() { + val handler = this[ContinuationInterceptor] + return handler as? SimulationController ?: throw IllegalArgumentException( + "SimulationCoroutineScope requires a SimulationController such as SimulatorCoroutineDispatcher as " + + "the ContinuationInterceptor (Dispatcher)" + ) + } |
