diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2018-02-14 12:32:57 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2018-02-14 12:32:57 +0100 |
| commit | 157d30beb52c75831e29a1a22c199b95d6d30b42 (patch) | |
| tree | c425bd9c5c6c76aa675634dd8c21590055f26f65 | |
| parent | 2e819d59934b5308bc58d6c69709112e2a6af402 (diff) | |
refactor(#18): Create distinction between kernel and simulation
This change creates a distinction between a kernel and a simulation.
A single simulation is represented by a `Simulation` object which
provides control over the simulation, while the `Kernel` interface
allows users to create a new simulation using that kernel as backend.
13 files changed, 453 insertions, 458 deletions
diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/Bootstrap.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/Bootstrap.kt index 10a89704..5f41c727 100644 --- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/Bootstrap.kt +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/Bootstrap.kt @@ -9,15 +9,15 @@ package com.atlarge.opendc.simulator */ interface Bootstrap<M> { /** - * Bootstrap a model `M` for a kernel in the given context. + * Apply the apply procedure for model `M` for a simulation in the given context. * - * @param context The context to bootstrap to model in. - * @return The initialised model for the simulation. + * @param context The context to apply to model in. + * @return The initialised, resulting model for the simulation. */ - fun bootstrap(context: Context<M>): M + fun apply(context: Context<M>): M /** - * A context for the bootstrap of some model type `M` that allows the model to register the entities of the model to + * A context for the apply of some model type `M` that allows the model to register the entities of the model to * the simulation kernel. * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) @@ -40,7 +40,7 @@ interface Bootstrap<M> { fun deregister(entity: Entity<*, M>): Boolean /** - * Schedule a message for processing by a [Context]. + * Schedule a message to be received by the given [Entity]. * * @param message The message to schedule. * @param destination The destination of the message. @@ -52,13 +52,13 @@ interface Bootstrap<M> { companion object { /** - * Create a [Bootstrap] procedure using the given block to produce a bootstrap for a model of type `M`. + * Create a [Bootstrap] procedure using the given block to produce a apply for a model of type `M`. * - * @param block The block to produce the bootstrap. - * @return The bootstrap procedure that has been built. + * @param block The block to produce the apply. + * @return The apply procedure that has been built. */ fun <M> create(block: (Context<M>) -> M): Bootstrap<M> = object : Bootstrap<M> { - override fun bootstrap(context: Context<M>) = block(context) + override fun apply(context: Context<M>) = block(context) } } } diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Kernel.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Kernel.kt index 29b3bdee..d4995283 100644 --- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Kernel.kt +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Kernel.kt @@ -25,55 +25,19 @@ package com.atlarge.opendc.simulator.kernel import com.atlarge.opendc.simulator.Bootstrap -import com.atlarge.opendc.simulator.Entity -import com.atlarge.opendc.simulator.Instant /** - * A message based discrete event simulation kernel. + * A message-based discrete event simulator (DES). This interface is a factory for creating [Simulation]s using the + * provided [Bootstrap] for the model. * - * The kernel is created by bootstrapping some model `M` (see [Bootstrap]) to simulate and controls the simulation by - * allowing the user to step over cycles in the simulation and inspect the internal state using [Entity.state]. - * - * A kernel should provide additionally a [KernelFactory] to create new kernel instances given a certain model - * [Bootstrap]. - * - * @param M The shape of the model over which the simulation runs. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -interface Kernel<out M> { - /** - * The model in which the simulation runs. - */ - val model: M - - /** - * The simulation time. - */ - var time: Instant - - /** - * The observable state of an [Entity] in simulation, which is provided by the simulation context. - */ - val <E : Entity<S, *>, S> E.state: S - - /** - * Step through one cycle in the simulation. This method will process all events in a single tick, update the - * internal clock and then return the control to the user. - */ - fun step() - - /** - * Run a simulation over the specified model. - * This method will step through multiple cycles in the simulation until no more message exist in the queue. - */ - fun run() - +interface Kernel { /** - * Run a simulation over the specified model, stepping through cycles until the specified clock tick has - * occurred. The control is then handed back to the user. + * Create a simulation over the given model facilitated by this simulation kernel. * - * @param until The point in simulation time at which the simulation should be paused and the control is handed - * back to the user. + * @param bootstrap The apply procedure to apply the simulation with. + * @return A [Simulation] instance representing the simulation. */ - fun run(until: Instant) + fun <M> create(bootstrap: Bootstrap<M>): Simulation<M> } diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/KernelFactory.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/KernelFactory.kt deleted file mode 100644 index 30abb7ca..00000000 --- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/KernelFactory.kt +++ /dev/null @@ -1,42 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2017 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.simulator.kernel - -import com.atlarge.opendc.simulator.Bootstrap - -/** - * A factory for bootstrapping simulation [Kernel] instances. - * - * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) - */ -interface KernelFactory { - /** - * Create a simulation over the given model facilitated by this simulation kernel. - * - * @param bootstrap The bootstrap procedure to bootstrap the simulation with. - * @return A [Kernel] instance to control the simulation. - */ - fun <M> create(bootstrap: Bootstrap<M>): Kernel<M> -} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt new file mode 100644 index 00000000..bb2ef818 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt @@ -0,0 +1,74 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.simulator.kernel + +import com.atlarge.opendc.simulator.Entity +import com.atlarge.opendc.simulator.Instant + +/** + * A message based discrete event simulation over some model `M`. This interface provides direct control over the + * simulation, allowing the user to step over cycles of the simulation and inspecting the state of the simulation via + * [Entity.state]. + * + * @param M The shape of the model over which the simulation runs. + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +interface Simulation<out M> { + /** + * The model in which the simulation runs. + */ + val model: M + + /** + * The simulation time. + */ + var time: Instant + + /** + * The observable state of an [Entity] in simulation, which is provided by the simulation context. + */ + val <E : Entity<S, *>, S> E.state: S + + /** + * Step through one cycle in the simulation. This method will process all events in a single tick, update the + * internal clock and then return the control to the user. + */ + fun step() + + /** + * Run a simulation over the specified model. + * This method will step through multiple cycles in the simulation until no more message exist in the queue. + */ + fun run() + + /** + * Run a simulation over the specified model, stepping through cycles until the specified clock tick has + * occurred. The control is then handed back to the user. + * + * @param until The point in simulation time at which the simulation should be paused and the control is handed + * back to the user. + */ + fun run(until: Instant) +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/platform/Experiment.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/platform/Experiment.kt index 92d56be1..eb959ded 100644 --- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/platform/Experiment.kt +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/platform/Experiment.kt @@ -25,7 +25,7 @@ package com.atlarge.opendc.simulator.platform import com.atlarge.opendc.simulator.Duration -import com.atlarge.opendc.simulator.kernel.KernelFactory +import com.atlarge.opendc.simulator.kernel.Kernel /** * A blueprint for a reproducible simulation in a pre-defined setting. @@ -39,7 +39,7 @@ interface Experiment<out T> { * @param factory The factory to create the simulation kernel with. * @return The result of the experiment. */ - fun run(factory: KernelFactory): T + fun run(factory: Kernel): T /** * Run the experiment on the specified kernel implementation. @@ -48,5 +48,5 @@ interface Experiment<out T> { * @param timeout The maximum duration of the experiment before returning to the caller. * @return The result of the experiment or `null`. */ - fun run(factory: KernelFactory, timeout: Duration): T? + fun run(factory: Kernel, timeout: Duration): T? } diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt index 6ece73aa..e0d70eed 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt @@ -24,11 +24,9 @@ package com.atlarge.opendc.omega -import com.atlarge.opendc.simulator.* +import com.atlarge.opendc.simulator.Bootstrap +import com.atlarge.opendc.simulator.kernel.Simulation import com.atlarge.opendc.simulator.kernel.Kernel -import mu.KotlinLogging -import java.util.* -import kotlin.coroutines.experimental.* /** * The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core. @@ -36,296 +34,14 @@ import kotlin.coroutines.experimental.* * This simulator implementation is a single-threaded implementation, running simulation kernels synchronously and * provides a single priority queue for all events (messages, ticks, etc) that occur in the entities. * - * @property model The model that is simulated. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -internal class OmegaKernel<M>(bootstrap: Bootstrap<M>) : Kernel<M>, Bootstrap.Context<M> { +object OmegaKernel : Kernel { /** - * The logger instance to use for the simulator. + * Create a simulation over the given model facilitated by this simulation kernel. + * + * @param bootstrap The apply procedure to apply the simulation with. + * @return A [Simulation] instance to control the simulation. */ - private val logger = KotlinLogging.logger {} - - /** - * The registry of the simulation kernels used in the experiment. - */ - private val registry: MutableMap<Entity<*, *>, OmegaContext<*>> = HashMap() - - /** - * The message queue. - */ - private val queue: Queue<MessageContainer> = PriorityQueue(Comparator.comparingLong(MessageContainer::time)) - - /** - * The processes to be spawned. - */ - private val spawnings: Queue<Process<*, M>> = ArrayDeque() - - /** - * The simulation time. - */ - override var time: Instant = 0 - - /** - * The model of simulation. - */ - override val model: M = bootstrap.bootstrap(this) - - override val <E : Entity<S, *>, S> E.state: S - get() = context?.state ?: initialState - - /** - * The context associated with an [Entity]. - */ - @Suppress("UNCHECKED_CAST") - private val <E : Entity<S, M>, S, M> E.context: OmegaContext<S>? - get() = registry[this] as? OmegaContext<S> - - override fun register(entity: Entity<*, M>): Boolean { - if (!registry.containsKey(entity) && entity !is Process) { - return false - } - - val process = entity as Process<*, M> - spawnings.add(process) - return true - } - - override fun deregister(entity: Entity<*, M>): Boolean { - val context = entity.context ?: return false - context.resume(Unit) - return true - } - - override fun schedule(message: Any, destination: Entity<*, *>, sender: Entity<*, *>?, delay: Duration) = - schedule(prepare(message, destination, sender, delay)) - - override fun step() { - while (true) { - // Initialise all spawned processes - while (spawnings.isNotEmpty()) { - val process = spawnings.poll() - val context = OmegaContext(process).also { registry[process] = it } - - // Bootstrap the process coroutine - val block: suspend () -> Unit = { context.start() } - block.startCoroutine(context) - - } - - val envelope = queue.peek() ?: return - val delivery = envelope.time - - if (delivery > time) { - // Tick has yet to occur - // Jump in time to next event - time = delivery - break - } else if (delivery < time) { - // Tick has already occurred - logger.warn { "Message processed out of order" } - } - - queue.poll() - - // If the sender has canceled the message, we move on to the next message - if (envelope.canceled) { - continue - } - - val context = envelope.destination.context ?: continue - - if (envelope.message !is Interrupt) { - context.continuation.resume(envelope) - } else { - context.continuation.resumeWithException(envelope.message) - } - - context.last = time - } - } - - - override fun run() { - while (queue.isNotEmpty() || spawnings.isNotEmpty()) { - step() - } - } - - override fun run(until: Instant) { - require(until > 0) { "The given instant must be a non-zero positive number" } - - if (time >= until) { - return - } - - while (time < until && (queue.isNotEmpty() || spawnings.isNotEmpty())) { - step() - } - - // Fix clock if step() jumped too far in time to give the impression to the user that simulation stopped at - // exactly the tick it gave. This has not effect on the actual simulation results as the next call to run() will - // just jump forward again. - if (time > until) { - time = until - } - } - - private fun schedule(envelope: MessageContainer) { - queue.add(envelope) - } - - private fun prepare(message: Any, destination: Entity<*, *>, sender: Entity<*, *>? = null, - delay: Duration): MessageContainer { - require(delay >= 0) { "The amount of time to delay the message must be a positive number" } - return MessageContainer(message, time + delay, sender, destination) - } - - /** - * This internal class provides the default implementation for the [Context] interface for this simulator. - */ - private inner class OmegaContext<S>(val process: Process<S, M>) : Context<S, M>, Continuation<Unit> { - /** - * The continuation to resume the execution of the process. - */ - lateinit var continuation: Continuation<Envelope<*>> - - /** - * The last point in time the process has done some work. - */ - var last: Instant = -1 - - /** - * The model in which the process exists. - */ - override val model: M - get() = this@OmegaKernel.model - - /** - * The state of the entity. - */ - override var state: S = process.initialState - - /** - * The current point in simulation time. - */ - override val time: Instant - get() = this@OmegaKernel.time - - /** - * The duration between the current point in simulation time and the last point in simulation time where the - * [Context] has executed some work. - */ - override val delta: Duration - get() = maxOf(time - last, 0) - - /** - * The [CoroutineContext] for a [Context]. - */ - override val context: CoroutineContext = EmptyCoroutineContext - - /** - * The observable state of an [Entity] within the simulation is provided by the context of the simulation. - */ - override val <T : Entity<S, *>, S> T.state: S - get() = context?.state ?: initialState - - /** - * Start the process associated with this context. - */ - internal suspend fun start() = process.run { - run() - } - - /** - * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the - * message has been received. - * - * @return The envelope containing the message. - */ - suspend fun receiveEnvelope(): Envelope<*> = suspendCoroutine { continuation = it } - - override suspend fun <T> receive(transform: suspend Envelope<*>.(Any) -> T): T { - val envelope = receiveEnvelope() - return transform(envelope, envelope.message) - } - - - override suspend fun <T> receive(timeout: Duration, transform: suspend Envelope<*>.(Any) -> T): T? { - val send = prepare(Timeout, process, process, timeout).also { schedule(it) } - - try { - val received = receiveEnvelope() - - if (received.message == Timeout) { - send.canceled = true - return transform(received, received.message) - } - - return null - } finally { - send.canceled = true - } - } - - override suspend fun Entity<*, *>.send(msg: Any, delay: Duration) = send(msg, process, delay) - - override suspend fun Entity<*, *>.send(msg: Any, sender: Entity<*, *>, delay: Duration) = - schedule(prepare(msg, this, sender, delay)) - - override suspend fun Entity<*, *>.interrupt() = send(Interrupt) - - override suspend fun hold(duration: Duration) { - require(duration >= 0) { "The amount of time to hold must be a positive number" } - val envelope = prepare(Resume, process, process, duration).also { schedule(it) } - - try { - while (true) { - if (receive() == Resume) - return - } - } finally { - envelope.canceled = true - } - } - - override suspend fun hold(duration: Duration, queue: Queue<Any>) { - require(duration >= 0) { "The amount of time to hold must be a positive number" } - val envelope = prepare(Resume, process, process, duration).also { schedule(it) } - - try { - while (true) { - val msg = receive() - if (msg == Resume) - return - queue.add(msg) - } - } finally { - envelope.canceled = true - } - } - - - // Completion continuation implementation - /** - * Resume the execution of this continuation with the given value. - * - * @param value The value to resume with. - */ - override fun resume(value: Unit) { - // Deregister process from registry in order to have the GC collect this context - registry.remove(process) - } - - /** - * Resume the execution of this continuation with an exception. - * - * @param exception The exception to resume with. - */ - override fun resumeWithException(exception: Throwable) { - // Deregister process from registry in order to have the GC collect this context:w - registry.remove(process) - - logger.error(exception) { "An exception occurred during the execution of a process" } - } - } + override fun <M> create(bootstrap: Bootstrap<M>): Simulation<M> = OmegaSimulation(bootstrap) } diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernelFactory.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernelFactory.kt deleted file mode 100644 index 139cbd19..00000000 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernelFactory.kt +++ /dev/null @@ -1,47 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2017 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.omega - -import com.atlarge.opendc.simulator.Bootstrap -import com.atlarge.opendc.simulator.kernel.Kernel -import com.atlarge.opendc.simulator.kernel.KernelFactory - -/** - * The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core. - * - * This simulator implementation is a single-threaded implementation, running simulation kernels synchronously and - * provides a single priority queue for all events (messages, ticks, etc) that occur in the entities. - * - * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) - */ -object OmegaKernelFactory : KernelFactory { - /** - * Create a simulation over the given model facilitated by this simulation kernel. - * - * @param bootstrap The bootstrap procedure to bootstrap the simulation with. - * @return A [Kernel] instance to control the simulation. - */ - override fun <M> create(bootstrap: Bootstrap<M>): Kernel<M> = OmegaKernel(bootstrap) -} diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt new file mode 100644 index 00000000..80ac4600 --- /dev/null +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -0,0 +1,331 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.omega + +import com.atlarge.opendc.simulator.* +import com.atlarge.opendc.simulator.kernel.Simulation +import mu.KotlinLogging +import java.util.* +import kotlin.coroutines.experimental.* + +/** + * The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core. + * + * This simulator implementation is a single-threaded implementation, running simulation kernels synchronously and + * provides a single priority queue for all events (messages, ticks, etc) that occur in the entities. + * + * @property model The model that is simulated. + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Bootstrap.Context<M> { + /** + * The logger instance to use for the simulator. + */ + private val logger = KotlinLogging.logger {} + + /** + * The registry of the simulation kernels used in the experiment. + */ + private val registry: MutableMap<Entity<*, *>, OmegaContext<*>> = HashMap() + + /** + * The message queue. + */ + private val queue: Queue<MessageContainer> = PriorityQueue(Comparator.comparingLong(MessageContainer::time)) + + /** + * The processes to be spawned. + */ + private val spawnings: Queue<Process<*, M>> = ArrayDeque() + + /** + * The simulation time. + */ + override var time: Instant = 0 + + /** + * The model of simulation. + */ + override val model: M = bootstrap.apply(this) + + override val <E : Entity<S, *>, S> E.state: S + get() = context?.state ?: initialState + + /** + * The context associated with an [Entity]. + */ + @Suppress("UNCHECKED_CAST") + private val <E : Entity<S, M>, S, M> E.context: OmegaContext<S>? + get() = registry[this] as? OmegaContext<S> + + override fun register(entity: Entity<*, M>): Boolean { + if (!registry.containsKey(entity) && entity !is Process) { + return false + } + + val process = entity as Process<*, M> + spawnings.add(process) + return true + } + + override fun deregister(entity: Entity<*, M>): Boolean { + val context = entity.context ?: return false + context.resume(Unit) + return true + } + + override fun schedule(message: Any, destination: Entity<*, *>, sender: Entity<*, *>?, delay: Duration) = + schedule(prepare(message, destination, sender, delay)) + + override fun step() { + while (true) { + // Initialise all spawned processes + while (spawnings.isNotEmpty()) { + val process = spawnings.poll() + val context = OmegaContext(process).also { registry[process] = it } + + // Bootstrap the process coroutine + val block: suspend () -> Unit = { context.start() } + block.startCoroutine(context) + + } + + val envelope = queue.peek() ?: return + val delivery = envelope.time + + if (delivery > time) { + // Tick has yet to occur + // Jump in time to next event + time = delivery + break + } else if (delivery < time) { + // Tick has already occurred + logger.warn { "Message processed out of order" } + } + + queue.poll() + + // If the sender has canceled the message, we move on to the next message + if (envelope.canceled) { + continue + } + + val context = envelope.destination.context ?: continue + + if (envelope.message !is Interrupt) { + context.continuation.resume(envelope) + } else { + context.continuation.resumeWithException(envelope.message) + } + + context.last = time + } + } + + + override fun run() { + while (queue.isNotEmpty() || spawnings.isNotEmpty()) { + step() + } + } + + override fun run(until: Instant) { + require(until > 0) { "The given instant must be a non-zero positive number" } + + if (time >= until) { + return + } + + while (time < until && (queue.isNotEmpty() || spawnings.isNotEmpty())) { + step() + } + + // Fix clock if step() jumped too far in time to give the impression to the user that simulation stopped at + // exactly the tick it gave. This has not effect on the actual simulation results as the next call to run() will + // just jump forward again. + if (time > until) { + time = until + } + } + + private fun schedule(envelope: MessageContainer) { + queue.add(envelope) + } + + private fun prepare(message: Any, destination: Entity<*, *>, sender: Entity<*, *>? = null, + delay: Duration): MessageContainer { + require(delay >= 0) { "The amount of time to delay the message must be a positive number" } + return MessageContainer(message, time + delay, sender, destination) + } + + /** + * This internal class provides the default implementation for the [Context] interface for this simulator. + */ + private inner class OmegaContext<S>(val process: Process<S, M>) : Context<S, M>, Continuation<Unit> { + /** + * The continuation to resume the execution of the process. + */ + lateinit var continuation: Continuation<Envelope<*>> + + /** + * The last point in time the process has done some work. + */ + var last: Instant = -1 + + /** + * The model in which the process exists. + */ + override val model: M + get() = this@OmegaSimulation.model + + /** + * The state of the entity. + */ + override var state: S = process.initialState + + /** + * The current point in simulation time. + */ + override val time: Instant + get() = this@OmegaSimulation.time + + /** + * The duration between the current point in simulation time and the last point in simulation time where the + * [Context] has executed some work. + */ + override val delta: Duration + get() = maxOf(time - last, 0) + + /** + * The [CoroutineContext] for a [Context]. + */ + override val context: CoroutineContext = EmptyCoroutineContext + + /** + * The observable state of an [Entity] within the simulation is provided by the context of the simulation. + */ + override val <T : Entity<S, *>, S> T.state: S + get() = context?.state ?: initialState + + /** + * Start the process associated with this context. + */ + internal suspend fun start() = process.run { + run() + } + + /** + * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the + * message has been received. + * + * @return The envelope containing the message. + */ + suspend fun receiveEnvelope(): Envelope<*> = suspendCoroutine { continuation = it } + + override suspend fun <T> receive(transform: suspend Envelope<*>.(Any) -> T): T { + val envelope = receiveEnvelope() + return transform(envelope, envelope.message) + } + + + override suspend fun <T> receive(timeout: Duration, transform: suspend Envelope<*>.(Any) -> T): T? { + val send = prepare(Timeout, process, process, timeout).also { schedule(it) } + + try { + val received = receiveEnvelope() + + if (received.message == Timeout) { + send.canceled = true + return transform(received, received.message) + } + + return null + } finally { + send.canceled = true + } + } + + override suspend fun Entity<*, *>.send(msg: Any, delay: Duration) = send(msg, process, delay) + + override suspend fun Entity<*, *>.send(msg: Any, sender: Entity<*, *>, delay: Duration) = + schedule(prepare(msg, this, sender, delay)) + + override suspend fun Entity<*, *>.interrupt() = send(Interrupt) + + override suspend fun hold(duration: Duration) { + require(duration >= 0) { "The amount of time to hold must be a positive number" } + val envelope = prepare(Resume, process, process, duration).also { schedule(it) } + + try { + while (true) { + if (receive() == Resume) + return + } + } finally { + envelope.canceled = true + } + } + + override suspend fun hold(duration: Duration, queue: Queue<Any>) { + require(duration >= 0) { "The amount of time to hold must be a positive number" } + val envelope = prepare(Resume, process, process, duration).also { schedule(it) } + + try { + while (true) { + val msg = receive() + if (msg == Resume) + return + queue.add(msg) + } + } finally { + envelope.canceled = true + } + } + + + // Completion continuation implementation + /** + * Resume the execution of this continuation with the given value. + * + * @param value The value to resume with. + */ + override fun resume(value: Unit) { + // Deregister process from registry in order to have the GC collect this context + registry.remove(process) + } + + /** + * Resume the execution of this continuation with an exception. + * + * @param exception The exception to resume with. + */ + override fun resumeWithException(exception: Throwable) { + // Deregister process from registry in order to have the GC collect this context:w + registry.remove(process) + + logger.error(exception) { "An exception occurred during the execution of a process" } + } + } +} diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt index 48b4f02a..74fa686b 100644 --- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt +++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt @@ -66,8 +66,8 @@ internal class SmokeTest { } } } - val kernel = OmegaKernelFactory.create(bootstrap) - kernel.run() + val simulation = OmegaKernel.create(bootstrap) + simulation.run() } object NullProcess : Process<Unit, Unit> { @@ -88,8 +88,8 @@ internal class SmokeTest { } } - val kernel = OmegaKernelFactory.create(bootstrap) - kernel.run() + val simulation = OmegaKernel.create(bootstrap) + simulation.run() } object CrashProcess : Process<Unit, Unit> { @@ -112,8 +112,8 @@ internal class SmokeTest { } } - val kernel = OmegaKernelFactory.create(bootstrap) - kernel.run() + val simulation = OmegaKernel.create(bootstrap) + simulation.run() } class ModelProcess(private val value: Int) : Process<Boolean, Int> { @@ -136,9 +136,9 @@ internal class SmokeTest { value } - val kernel = OmegaKernelFactory.create(bootstrap) - kernel.run(5) + val simulation = OmegaKernel.create(bootstrap) + simulation.run(5) - assertTrue(kernel.run { process.state }) + assertTrue(simulation.run { process.state }) } } diff --git a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/JpaBootstrap.kt b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/JpaBootstrap.kt index 8c32c54d..0a0c792c 100644 --- a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/JpaBootstrap.kt +++ b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/JpaBootstrap.kt @@ -21,20 +21,20 @@ class JpaBootstrap(val experiment: Experiment) : Bootstrap<JpaModel> { /** * Bootstrap a model `M` for a kernel in the given context. * - * @param context The context to bootstrap to model in. + * @param context The context to apply to model in. * @return The initialised model for the simulation. */ - override fun bootstrap(context: Bootstrap.Context<JpaModel>): JpaModel { + override fun apply(context: Bootstrap.Context<JpaModel>): JpaModel { val section = experiment.path.sections.first() - // TODO We should not modify parts of the experiment in a bootstrap as the bootstrap should be reproducible. + // TODO We should not modify parts of the experiment in a apply as the apply should be reproducible. // Important: initialise the scheduler of the datacenter section.datacenter.scheduler = experiment.scheduler val topology = JpaTopologyFactory(section) .create() .bootstrap() - .bootstrap(context) + .apply(context) val trace = experiment.trace val tasks = trace.jobs.flatMap { it.tasks } diff --git a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaExperiment.kt b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaExperiment.kt index 74f96ccb..4e88b3d4 100644 --- a/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaExperiment.kt +++ b/opendc-model-odc/jpa/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaExperiment.kt @@ -34,7 +34,7 @@ import com.atlarge.opendc.model.odc.topology.container.Room import com.atlarge.opendc.model.odc.topology.machine.Machine import com.atlarge.opendc.model.topology.destinations import com.atlarge.opendc.simulator.Duration -import com.atlarge.opendc.simulator.kernel.KernelFactory +import com.atlarge.opendc.simulator.kernel.Kernel import com.atlarge.opendc.simulator.platform.Experiment import mu.KotlinLogging import java.io.Closeable @@ -65,7 +65,7 @@ class JpaExperiment(private val manager: EntityManager, * @param timeout The maximum duration of the experiment before returning to the caller. * @return The result of the experiment or `null`. */ - override fun run(factory: KernelFactory, timeout: Duration): Unit? { + override fun run(factory: Kernel, timeout: Duration): Unit? { if (experiment.state != ExperimentState.CLAIMED) { throw IllegalStateException("The experiment is in illegal state ${experiment.state}") } @@ -136,7 +136,7 @@ class JpaExperiment(private val manager: EntityManager, experiment.state = ExperimentState.FINISHED } - logger.info { "Kernel done" } + logger.info { "Simulation done" } val waiting: Long = tasks.fold(0.toLong()) { acc, task -> val finished = task.state as TaskState.Finished acc + (finished.previous.at - finished.previous.previous.at) @@ -165,7 +165,7 @@ class JpaExperiment(private val manager: EntityManager, * @param factory The factory to create the simulation kernel with. * @throws IllegalStateException if the simulation is already running or finished. */ - override fun run(factory: KernelFactory) = run(factory, -1)!! + override fun run(factory: Kernel) = run(factory, -1)!! /** * Closes this resource, relinquishing any underlying resources. diff --git a/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt b/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt index eb819a5b..13f322bc 100644 --- a/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt +++ b/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt @@ -24,8 +24,7 @@ package com.atlarge.opendc.model.odc.platform -import com.atlarge.opendc.model.odc.platform.JpaExperimentManager -import com.atlarge.opendc.omega.OmegaKernelFactory +import com.atlarge.opendc.omega.OmegaKernel import mu.KotlinLogging import java.util.concurrent.Executors import javax.persistence.Persistence @@ -50,7 +49,7 @@ fun main(args: Array<String>) { val threads = 4 val executorService = Executors.newFixedThreadPool(threads) val experiments = JpaExperimentManager(factory) - val kernel = OmegaKernelFactory + val kernel = OmegaKernel logger.info { "Waiting for enqueued experiments..." } while (true) { diff --git a/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/model/topology/Bootstrap.kt b/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/model/topology/Bootstrap.kt index 1a5bbcaf..e0b54a28 100644 --- a/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/model/topology/Bootstrap.kt +++ b/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/model/topology/Bootstrap.kt @@ -6,7 +6,7 @@ import com.atlarge.opendc.simulator.Entity /** * Create a [Bootstrap] procedure for the given [Topology]. * - * @return A bootstrap procedure for the topology. + * @return A apply procedure for the topology. */ fun <T : Topology> T.bootstrap(): Bootstrap<T> = Bootstrap.create { ctx -> forEach { ctx.register(it) } |
