diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2018-02-23 12:17:48 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2018-02-23 12:17:48 +0100 |
| commit | f691a72b12a43fa15c1617966450c55206664797 (patch) | |
| tree | e76afd3b1d5673a29d71eedb9d373396976d84bd /opendc-kernel-omega/src | |
| parent | 8666a78b86a40c1d8dab28dd18e841318c01f97f (diff) | |
| parent | 86dc826db4cd91b5a6875d9ecdd64c9238d7b95c (diff) | |
refactor(#18): Redesign core simulation API
This change contains the redesign of the core simulation API and
provides a cleaner interface for developing simulation models for the
users.
Closes #18
Diffstat (limited to 'opendc-kernel-omega/src')
4 files changed, 622 insertions, 0 deletions
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Messages.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Messages.kt new file mode 100644 index 00000000..d63a53c8 --- /dev/null +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Messages.kt @@ -0,0 +1,30 @@ +package com.atlarge.opendc.omega + +import com.atlarge.opendc.simulator.Context +import com.atlarge.opendc.simulator.Process + +/** + * An internal message used by the Omega simulation kernel to indicate to a suspended [Process], that it should wake up + * and resume execution. + * + * This message is not guaranteed to work on other simulation kernels and [Context.interrupt] should be preferred to + * wake up a process from another entity. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +object Resume + +/** + * An internal message used by the Omega simulation kernel to indicate to a suspended [Process], that a timeout has been + * reached and that it should wake up and resume execution. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +object Timeout + +/** + * An internal message used by the Omega simulation kernel to launch a process. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +data class Launch<M>(val process: Process<*, M>) diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt new file mode 100644 index 00000000..c0ab9fb4 --- /dev/null +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt @@ -0,0 +1,47 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.omega + +import com.atlarge.opendc.simulator.Bootstrap +import com.atlarge.opendc.simulator.kernel.Kernel +import com.atlarge.opendc.simulator.kernel.Simulation + +/** + * The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core. + * + * This simulator implementation is a single-threaded implementation, running simulation kernels synchronously and + * provides a single priority queue for all events (messages, ticks, etc) that occur in the entities. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +object OmegaKernel : Kernel { + /** + * Create a simulation over the given model facilitated by this simulation kernel. + * + * @param bootstrap The apply procedure to apply the simulation with. + * @return A [Simulation] instance to control the simulation. + */ + override fun <M> create(bootstrap: Bootstrap<M>): Simulation<M> = OmegaSimulation(bootstrap) +} diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt new file mode 100644 index 00000000..532a033a --- /dev/null +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -0,0 +1,402 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.omega + +import com.atlarge.opendc.simulator.* +import com.atlarge.opendc.simulator.kernel.Simulation +import mu.KotlinLogging +import java.util.* +import kotlin.coroutines.experimental.* + +/** + * The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core. + * + * This simulator implementation is a single-threaded implementation, running simulation kernels synchronously and + * provides a single priority queue for all events (messages, ticks, etc) that occur in the entities. + * + * @property model The model that is simulated. + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Bootstrap.Context<M> { + /** + * The logger instance to use for the simulator. + */ + private val logger = KotlinLogging.logger {} + + /** + * The registry of the processes used in the simulation. + */ + private val registry: MutableMap<Entity<*, *>, OmegaContext<*>> = HashMap() + + /** + * The message queue. + */ + private val queue: Queue<Envelope> = PriorityQueue(Comparator + .comparingLong(Envelope::time) + .thenComparingLong(Envelope::id)) + + /** + * The kernel process instance that handles internal operations during the simulation. + */ + private val process = object : Process<Unit, M> { + override val initialState = Unit + + override suspend fun Context<Unit, M>.run() { + while(true) { + val msg = receive() + when (msg) { + is Launch<*> -> + @Suppress("UNCHECKED_CAST") + launch((msg as Launch<M>).process) + } + } + } + } + + /** + * The context associated with an [Entity]. + */ + @Suppress("UNCHECKED_CAST") + private val <E : Entity<S, M>, S, M> E.context: OmegaContext<S>? + get() = registry[this] as? OmegaContext<S> + + /** + * The simulation time. + */ + override var time: Instant = 0 + + /** + * The model of simulation. + */ + // XXX: the bootstrap requires the properties of this class to be initialised, so changing the order may cause NPEs + override var model: M = bootstrap.apply(this) + + /** + * The observable state of an [Entity] in simulation, which is provided by the simulation context. + */ + override val <E : Entity<S, *>, S> E.state: S + get() = context?.state ?: initialState + + /** + * Initialise the simulation instance. + */ + init { + // Launch the Omega kernel process + launch(process) + } + + // Bootstrap Context implementation + override fun register(entity: Entity<*, M>): Boolean { + if (!registry.containsKey(entity) && entity !is Process) { + return false + } + + schedule(Launch(entity as Process<*, M>), process) + return true + } + + override fun deregister(entity: Entity<*, M>): Boolean { + val context = entity.context ?: return false + context.resume(Unit) + return true + } + + override fun schedule(message: Any, destination: Entity<*, *>, sender: Entity<*, *>?, delay: Duration) = + schedule(prepare(message, destination, sender, delay)) + + // Simulation implementation + override fun step() { + while (true) { + val envelope = queue.peek() ?: return + val delivery = envelope.time + + if (delivery > time) { + // Tick has yet to occur + // Jump in time to next event + time = delivery + break + } else if (delivery < time) { + // Tick has already occurred + logger.warn { "Message processed out of order" } + } + + queue.poll() + + // If the sender has canceled the message, we move on to the next message + if (envelope.canceled) { + continue + } + + val context = envelope.destination.context ?: continue + + if (envelope.message !is Interrupt) { + context.continuation.resume(envelope) + } else { + context.continuation.resumeWithException(envelope.message) + } + + context.last = time + } + } + + override fun run() { + while (queue.isNotEmpty()) { + step() + } + } + + override fun run(until: Instant) { + require(until > 0) { "The given instant must be a non-zero positive number" } + + if (time >= until) { + return + } + + while (time < until && queue.isNotEmpty()) { + step() + } + + // Fix clock if step() jumped too far in time to give the impression to the user that simulation stopped at + // exactly the tick it gave. This has not effect on the actual simulation results as the next call to run() will + // just jump forward again. + if (time > until) { + time = until + } + } + + /** + * The identifier for the next message to be scheduled. + */ + private var nextId: Long = 0 + + /** + * A wrapper around a message that has been scheduled for processing. + * + * @property id The identifier of the message to keep the priority queue stable + * @property message The message to wrap. + * @property time The point in time to deliver the message. + * @property sender The sender of the message. + * @property destination The destination of the message. + */ + private data class Envelope(val id: Long, + val message: Any, + val time: Instant, + val sender: Entity<*, *>?, + val destination: Entity<*, *>) { + /** + * A flag to indicate the message has been canceled. + */ + internal var canceled: Boolean = false + } + + /** + * Schedule the given envelope to be processed by the kernel. + * + * @param envelope The envelope containing the message to schedule. + */ + private fun schedule(envelope: Envelope) { + queue.add(envelope) + } + + /** + * Prepare a message for scheduling by wrapping it into an envelope. + * + * @param message The message to send. + * @param destination The destination entity that should receive the message. + * @param sender The optional sender of the message. + * @param delay The time to delay the message. + */ + private fun prepare(message: Any, destination: Entity<*, *>, sender: Entity<*, *>? = null, + delay: Duration): Envelope { + require(delay >= 0) { "The amount of time to delay the message must be a positive number" } + return Envelope(nextId++, message, time + delay, sender, destination) + } + + /** + * Launch the given [Process]. + * + * @param process The process to launch. + */ + private fun launch(process: Process<*, M>) { + val context = OmegaContext(process).also { registry[process] = it } + + // Bootstrap the process coroutine + val block: suspend () -> Unit = { context.start() } + block.startCoroutine(context) + } + + /** + * This internal class provides the default implementation for the [Context] interface for this simulator. + */ + private inner class OmegaContext<S>(val process: Process<S, M>) : Context<S, M>, Continuation<Unit>, + AbstractCoroutineContextElement(Context) { + /** + * The model in which the process exists. + */ + override val model: M + get() = this@OmegaSimulation.model + + /** + * The current point in simulation time. + */ + override val time: Instant + get() = this@OmegaSimulation.time + + /** + * The [Entity] associated with this context. + */ + override val self: Entity<S, M> + get() = process + + /** + * The duration between the current point in simulation time and the last point in simulation time where the + * [Context] has executed some work. + */ + override val delta: Duration + get() = maxOf(time - last, 0) + + /** + * The state of the entity. + */ + override var state: S = process.initialState + + /** + * The observable state of an [Entity] within the simulation is provided by the context of the simulation. + */ + override val <T : Entity<S, *>, S> T.state: S + get() = context?.state ?: initialState + + /** + * The sender of the last received message or `null` in case the process has not received any messages yet. + */ + override var sender: Entity<*, *>? = null + + /** + * The [CoroutineContext] for a [Context]. + */ + override val context: CoroutineContext = this + + /** + * The continuation to resume the execution of the process. + */ + lateinit var continuation: Continuation<Envelope> + + /** + * The last point in time the process has done some work. + */ + var last: Instant = -1 + + override suspend fun receive(): Any = receiveEnvelope().message + + override suspend fun receive(timeout: Duration): Any? { + val send = prepare(Timeout, process, process, timeout).also { schedule(it) } + + try { + val received = receiveEnvelope() + + if (received.message != Timeout) { + send.canceled = true + return received.message + } + + return null + } finally { + send.canceled = true + } + } + + override suspend fun Entity<*, *>.send(msg: Any, sender: Entity<*, *>, delay: Duration) = + schedule(prepare(msg, this, sender, delay)) + + override suspend fun Entity<*, *>.interrupt(interrupt: Interrupt) = send(interrupt) + + override suspend fun hold(duration: Duration) { + require(duration >= 0) { "The amount of time to hold must be a positive number" } + val envelope = prepare(Resume, process, process, duration).also { schedule(it) } + + try { + while (true) { + if (receive() == Resume) + return + } + } finally { + envelope.canceled = true + } + } + + override suspend fun hold(duration: Duration, queue: Queue<Any>) { + require(duration >= 0) { "The amount of time to hold must be a positive number" } + val envelope = prepare(Resume, process, process, duration).also { schedule(it) } + + try { + while (true) { + val msg = receive() + if (msg == Resume) + return + queue.add(msg) + } + } finally { + envelope.canceled = true + } + } + + /** + * Start the process associated with this context. + */ + internal suspend fun start() = process.run { run() } + + /** + * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the + * message has been received. + * + * @return The envelope containing the message. + */ + suspend fun receiveEnvelope() = suspendCoroutine<Envelope> { continuation = it } + .also { sender = it.sender } + + // Completion continuation implementation + /** + * Resume the execution of this continuation with the given value. + * + * @param value The value to resume with. + */ + override fun resume(value: Unit) { + // Deregister process from registry in order to have the GC collect this context + registry.remove(process) + } + + /** + * Resume the execution of this continuation with an exception. + * + * @param exception The exception to resume with. + */ + override fun resumeWithException(exception: Throwable) { + // Deregister process from registry in order to have the GC collect this context:w + registry.remove(process) + + logger.error(exception) { "An exception occurred during the execution of a process" } + } + } +} diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt new file mode 100644 index 00000000..c47f9a26 --- /dev/null +++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt @@ -0,0 +1,143 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.omega + +import com.atlarge.opendc.simulator.Bootstrap +import com.atlarge.opendc.simulator.Context +import com.atlarge.opendc.simulator.Process +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test + +/** + * This test suite checks for smoke when running a large amount of simulations. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +internal class SmokeTest { + class EchoProcess : Process<Unit, Unit> { + override val initialState = Unit + override suspend fun Context<Unit, Unit>.run() { + while (true) { + val msg = receive() + sender?.send(msg) + } + } + } + + /** + * Run a large amount of simulations and test if any exceptions occur. + */ + @Test + fun smoke() { + val n = 1000 + val messages = 100 + val bootstrap: Bootstrap<Unit> = Bootstrap.create { ctx -> + repeat(n) { + EchoProcess().also { + ctx.register(it) + + for (i in 1 until messages) { + ctx.schedule(i, it, delay = i.toLong()) + } + } + } + } + val simulation = OmegaKernel.create(bootstrap) + simulation.run() + } + + object NullProcess : Process<Unit, Unit> { + override val initialState = Unit + override suspend fun Context<Unit, Unit>.run() {} + } + + /** + * Test if the kernel allows sending messages to [Context] instances that have already stopped. + */ + @Test + fun `sending message to process that has gracefully stopped`() { + val process = NullProcess + val bootstrap: Bootstrap<Unit> = Bootstrap.create { ctx -> + process.also { + ctx.register(it) + ctx.schedule(0, it) + } + } + + val simulation = OmegaKernel.create(bootstrap) + simulation.run() + } + + object CrashProcess : Process<Unit, Unit> { + override val initialState = Unit + override suspend fun Context<Unit, Unit>.run() { + TODO("This process should crash") + } + } + + /** + * Test if the kernel allows sending messages to [Context] instances that have crashed. + */ + @Test + fun `sending message to process that has crashed`() { + val process = CrashProcess + val bootstrap: Bootstrap<Unit> = Bootstrap.create { ctx -> + process.also { + ctx.register(it) + ctx.schedule(0, it) + } + } + + val simulation = OmegaKernel.create(bootstrap) + simulation.run() + } + + class ModelProcess(private val value: Int) : Process<Boolean, Int> { + override val initialState = false + override suspend fun Context<Boolean, Int>.run() { + assertEquals(value, model) + state = true + hold(10) + } + } + /** + * Test if the kernel allows access to the simulation model object. + */ + @Test + fun `access simulation model`() { + val value = 1 + val process = ModelProcess(value) + val bootstrap: Bootstrap<Int> = Bootstrap.create { ctx -> + ctx.register(process) + value + } + + val simulation = OmegaKernel.create(bootstrap) + simulation.run(5) + + assertTrue(simulation.run { process.state }) + } +} |
