diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2018-02-14 12:32:57 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2018-02-14 12:32:57 +0100 |
| commit | 157d30beb52c75831e29a1a22c199b95d6d30b42 (patch) | |
| tree | c425bd9c5c6c76aa675634dd8c21590055f26f65 /opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt | |
| parent | 2e819d59934b5308bc58d6c69709112e2a6af402 (diff) | |
refactor(#18): Create distinction between kernel and simulation
This change creates a distinction between a kernel and a simulation.
A single simulation is represented by a `Simulation` object which
provides control over the simulation, while the `Kernel` interface
allows users to create a new simulation using that kernel as backend.
Diffstat (limited to 'opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt')
| -rw-r--r-- | opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt | 300 |
1 files changed, 8 insertions, 292 deletions
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt index 6ece73aa..e0d70eed 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt @@ -24,11 +24,9 @@ package com.atlarge.opendc.omega -import com.atlarge.opendc.simulator.* +import com.atlarge.opendc.simulator.Bootstrap +import com.atlarge.opendc.simulator.kernel.Simulation import com.atlarge.opendc.simulator.kernel.Kernel -import mu.KotlinLogging -import java.util.* -import kotlin.coroutines.experimental.* /** * The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core. @@ -36,296 +34,14 @@ import kotlin.coroutines.experimental.* * This simulator implementation is a single-threaded implementation, running simulation kernels synchronously and * provides a single priority queue for all events (messages, ticks, etc) that occur in the entities. * - * @property model The model that is simulated. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -internal class OmegaKernel<M>(bootstrap: Bootstrap<M>) : Kernel<M>, Bootstrap.Context<M> { +object OmegaKernel : Kernel { /** - * The logger instance to use for the simulator. + * Create a simulation over the given model facilitated by this simulation kernel. + * + * @param bootstrap The apply procedure to apply the simulation with. + * @return A [Simulation] instance to control the simulation. */ - private val logger = KotlinLogging.logger {} - - /** - * The registry of the simulation kernels used in the experiment. - */ - private val registry: MutableMap<Entity<*, *>, OmegaContext<*>> = HashMap() - - /** - * The message queue. - */ - private val queue: Queue<MessageContainer> = PriorityQueue(Comparator.comparingLong(MessageContainer::time)) - - /** - * The processes to be spawned. - */ - private val spawnings: Queue<Process<*, M>> = ArrayDeque() - - /** - * The simulation time. - */ - override var time: Instant = 0 - - /** - * The model of simulation. - */ - override val model: M = bootstrap.bootstrap(this) - - override val <E : Entity<S, *>, S> E.state: S - get() = context?.state ?: initialState - - /** - * The context associated with an [Entity]. - */ - @Suppress("UNCHECKED_CAST") - private val <E : Entity<S, M>, S, M> E.context: OmegaContext<S>? - get() = registry[this] as? OmegaContext<S> - - override fun register(entity: Entity<*, M>): Boolean { - if (!registry.containsKey(entity) && entity !is Process) { - return false - } - - val process = entity as Process<*, M> - spawnings.add(process) - return true - } - - override fun deregister(entity: Entity<*, M>): Boolean { - val context = entity.context ?: return false - context.resume(Unit) - return true - } - - override fun schedule(message: Any, destination: Entity<*, *>, sender: Entity<*, *>?, delay: Duration) = - schedule(prepare(message, destination, sender, delay)) - - override fun step() { - while (true) { - // Initialise all spawned processes - while (spawnings.isNotEmpty()) { - val process = spawnings.poll() - val context = OmegaContext(process).also { registry[process] = it } - - // Bootstrap the process coroutine - val block: suspend () -> Unit = { context.start() } - block.startCoroutine(context) - - } - - val envelope = queue.peek() ?: return - val delivery = envelope.time - - if (delivery > time) { - // Tick has yet to occur - // Jump in time to next event - time = delivery - break - } else if (delivery < time) { - // Tick has already occurred - logger.warn { "Message processed out of order" } - } - - queue.poll() - - // If the sender has canceled the message, we move on to the next message - if (envelope.canceled) { - continue - } - - val context = envelope.destination.context ?: continue - - if (envelope.message !is Interrupt) { - context.continuation.resume(envelope) - } else { - context.continuation.resumeWithException(envelope.message) - } - - context.last = time - } - } - - - override fun run() { - while (queue.isNotEmpty() || spawnings.isNotEmpty()) { - step() - } - } - - override fun run(until: Instant) { - require(until > 0) { "The given instant must be a non-zero positive number" } - - if (time >= until) { - return - } - - while (time < until && (queue.isNotEmpty() || spawnings.isNotEmpty())) { - step() - } - - // Fix clock if step() jumped too far in time to give the impression to the user that simulation stopped at - // exactly the tick it gave. This has not effect on the actual simulation results as the next call to run() will - // just jump forward again. - if (time > until) { - time = until - } - } - - private fun schedule(envelope: MessageContainer) { - queue.add(envelope) - } - - private fun prepare(message: Any, destination: Entity<*, *>, sender: Entity<*, *>? = null, - delay: Duration): MessageContainer { - require(delay >= 0) { "The amount of time to delay the message must be a positive number" } - return MessageContainer(message, time + delay, sender, destination) - } - - /** - * This internal class provides the default implementation for the [Context] interface for this simulator. - */ - private inner class OmegaContext<S>(val process: Process<S, M>) : Context<S, M>, Continuation<Unit> { - /** - * The continuation to resume the execution of the process. - */ - lateinit var continuation: Continuation<Envelope<*>> - - /** - * The last point in time the process has done some work. - */ - var last: Instant = -1 - - /** - * The model in which the process exists. - */ - override val model: M - get() = this@OmegaKernel.model - - /** - * The state of the entity. - */ - override var state: S = process.initialState - - /** - * The current point in simulation time. - */ - override val time: Instant - get() = this@OmegaKernel.time - - /** - * The duration between the current point in simulation time and the last point in simulation time where the - * [Context] has executed some work. - */ - override val delta: Duration - get() = maxOf(time - last, 0) - - /** - * The [CoroutineContext] for a [Context]. - */ - override val context: CoroutineContext = EmptyCoroutineContext - - /** - * The observable state of an [Entity] within the simulation is provided by the context of the simulation. - */ - override val <T : Entity<S, *>, S> T.state: S - get() = context?.state ?: initialState - - /** - * Start the process associated with this context. - */ - internal suspend fun start() = process.run { - run() - } - - /** - * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the - * message has been received. - * - * @return The envelope containing the message. - */ - suspend fun receiveEnvelope(): Envelope<*> = suspendCoroutine { continuation = it } - - override suspend fun <T> receive(transform: suspend Envelope<*>.(Any) -> T): T { - val envelope = receiveEnvelope() - return transform(envelope, envelope.message) - } - - - override suspend fun <T> receive(timeout: Duration, transform: suspend Envelope<*>.(Any) -> T): T? { - val send = prepare(Timeout, process, process, timeout).also { schedule(it) } - - try { - val received = receiveEnvelope() - - if (received.message == Timeout) { - send.canceled = true - return transform(received, received.message) - } - - return null - } finally { - send.canceled = true - } - } - - override suspend fun Entity<*, *>.send(msg: Any, delay: Duration) = send(msg, process, delay) - - override suspend fun Entity<*, *>.send(msg: Any, sender: Entity<*, *>, delay: Duration) = - schedule(prepare(msg, this, sender, delay)) - - override suspend fun Entity<*, *>.interrupt() = send(Interrupt) - - override suspend fun hold(duration: Duration) { - require(duration >= 0) { "The amount of time to hold must be a positive number" } - val envelope = prepare(Resume, process, process, duration).also { schedule(it) } - - try { - while (true) { - if (receive() == Resume) - return - } - } finally { - envelope.canceled = true - } - } - - override suspend fun hold(duration: Duration, queue: Queue<Any>) { - require(duration >= 0) { "The amount of time to hold must be a positive number" } - val envelope = prepare(Resume, process, process, duration).also { schedule(it) } - - try { - while (true) { - val msg = receive() - if (msg == Resume) - return - queue.add(msg) - } - } finally { - envelope.canceled = true - } - } - - - // Completion continuation implementation - /** - * Resume the execution of this continuation with the given value. - * - * @param value The value to resume with. - */ - override fun resume(value: Unit) { - // Deregister process from registry in order to have the GC collect this context - registry.remove(process) - } - - /** - * Resume the execution of this continuation with an exception. - * - * @param exception The exception to resume with. - */ - override fun resumeWithException(exception: Throwable) { - // Deregister process from registry in order to have the GC collect this context:w - registry.remove(process) - - logger.error(exception) { "An exception occurred during the execution of a process" } - } - } + override fun <M> create(bootstrap: Bootstrap<M>): Simulation<M> = OmegaSimulation(bootstrap) } |
