From 157d30beb52c75831e29a1a22c199b95d6d30b42 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 14 Feb 2018 12:32:57 +0100 Subject: refactor(#18): Create distinction between kernel and simulation This change creates a distinction between a kernel and a simulation. A single simulation is represented by a `Simulation` object which provides control over the simulation, while the `Kernel` interface allows users to create a new simulation using that kernel as backend. --- .../com/atlarge/opendc/omega/OmegaSimulation.kt | 331 +++++++++++++++++++++ 1 file changed, 331 insertions(+) create mode 100644 opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt (limited to 'opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt') diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt new file mode 100644 index 00000000..80ac4600 --- /dev/null +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -0,0 +1,331 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.omega + +import com.atlarge.opendc.simulator.* +import com.atlarge.opendc.simulator.kernel.Simulation +import mu.KotlinLogging +import java.util.* +import kotlin.coroutines.experimental.* + +/** + * The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core. + * + * This simulator implementation is a single-threaded implementation, running simulation kernels synchronously and + * provides a single priority queue for all events (messages, ticks, etc) that occur in the entities. + * + * @property model The model that is simulated. + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Bootstrap.Context { + /** + * The logger instance to use for the simulator. + */ + private val logger = KotlinLogging.logger {} + + /** + * The registry of the simulation kernels used in the experiment. + */ + private val registry: MutableMap, OmegaContext<*>> = HashMap() + + /** + * The message queue. + */ + private val queue: Queue = PriorityQueue(Comparator.comparingLong(MessageContainer::time)) + + /** + * The processes to be spawned. + */ + private val spawnings: Queue> = ArrayDeque() + + /** + * The simulation time. + */ + override var time: Instant = 0 + + /** + * The model of simulation. + */ + override val model: M = bootstrap.apply(this) + + override val , S> E.state: S + get() = context?.state ?: initialState + + /** + * The context associated with an [Entity]. + */ + @Suppress("UNCHECKED_CAST") + private val , S, M> E.context: OmegaContext? + get() = registry[this] as? OmegaContext + + override fun register(entity: Entity<*, M>): Boolean { + if (!registry.containsKey(entity) && entity !is Process) { + return false + } + + val process = entity as Process<*, M> + spawnings.add(process) + return true + } + + override fun deregister(entity: Entity<*, M>): Boolean { + val context = entity.context ?: return false + context.resume(Unit) + return true + } + + override fun schedule(message: Any, destination: Entity<*, *>, sender: Entity<*, *>?, delay: Duration) = + schedule(prepare(message, destination, sender, delay)) + + override fun step() { + while (true) { + // Initialise all spawned processes + while (spawnings.isNotEmpty()) { + val process = spawnings.poll() + val context = OmegaContext(process).also { registry[process] = it } + + // Bootstrap the process coroutine + val block: suspend () -> Unit = { context.start() } + block.startCoroutine(context) + + } + + val envelope = queue.peek() ?: return + val delivery = envelope.time + + if (delivery > time) { + // Tick has yet to occur + // Jump in time to next event + time = delivery + break + } else if (delivery < time) { + // Tick has already occurred + logger.warn { "Message processed out of order" } + } + + queue.poll() + + // If the sender has canceled the message, we move on to the next message + if (envelope.canceled) { + continue + } + + val context = envelope.destination.context ?: continue + + if (envelope.message !is Interrupt) { + context.continuation.resume(envelope) + } else { + context.continuation.resumeWithException(envelope.message) + } + + context.last = time + } + } + + + override fun run() { + while (queue.isNotEmpty() || spawnings.isNotEmpty()) { + step() + } + } + + override fun run(until: Instant) { + require(until > 0) { "The given instant must be a non-zero positive number" } + + if (time >= until) { + return + } + + while (time < until && (queue.isNotEmpty() || spawnings.isNotEmpty())) { + step() + } + + // Fix clock if step() jumped too far in time to give the impression to the user that simulation stopped at + // exactly the tick it gave. This has not effect on the actual simulation results as the next call to run() will + // just jump forward again. + if (time > until) { + time = until + } + } + + private fun schedule(envelope: MessageContainer) { + queue.add(envelope) + } + + private fun prepare(message: Any, destination: Entity<*, *>, sender: Entity<*, *>? = null, + delay: Duration): MessageContainer { + require(delay >= 0) { "The amount of time to delay the message must be a positive number" } + return MessageContainer(message, time + delay, sender, destination) + } + + /** + * This internal class provides the default implementation for the [Context] interface for this simulator. + */ + private inner class OmegaContext(val process: Process) : Context, Continuation { + /** + * The continuation to resume the execution of the process. + */ + lateinit var continuation: Continuation> + + /** + * The last point in time the process has done some work. + */ + var last: Instant = -1 + + /** + * The model in which the process exists. + */ + override val model: M + get() = this@OmegaSimulation.model + + /** + * The state of the entity. + */ + override var state: S = process.initialState + + /** + * The current point in simulation time. + */ + override val time: Instant + get() = this@OmegaSimulation.time + + /** + * The duration between the current point in simulation time and the last point in simulation time where the + * [Context] has executed some work. + */ + override val delta: Duration + get() = maxOf(time - last, 0) + + /** + * The [CoroutineContext] for a [Context]. + */ + override val context: CoroutineContext = EmptyCoroutineContext + + /** + * The observable state of an [Entity] within the simulation is provided by the context of the simulation. + */ + override val , S> T.state: S + get() = context?.state ?: initialState + + /** + * Start the process associated with this context. + */ + internal suspend fun start() = process.run { + run() + } + + /** + * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the + * message has been received. + * + * @return The envelope containing the message. + */ + suspend fun receiveEnvelope(): Envelope<*> = suspendCoroutine { continuation = it } + + override suspend fun receive(transform: suspend Envelope<*>.(Any) -> T): T { + val envelope = receiveEnvelope() + return transform(envelope, envelope.message) + } + + + override suspend fun receive(timeout: Duration, transform: suspend Envelope<*>.(Any) -> T): T? { + val send = prepare(Timeout, process, process, timeout).also { schedule(it) } + + try { + val received = receiveEnvelope() + + if (received.message == Timeout) { + send.canceled = true + return transform(received, received.message) + } + + return null + } finally { + send.canceled = true + } + } + + override suspend fun Entity<*, *>.send(msg: Any, delay: Duration) = send(msg, process, delay) + + override suspend fun Entity<*, *>.send(msg: Any, sender: Entity<*, *>, delay: Duration) = + schedule(prepare(msg, this, sender, delay)) + + override suspend fun Entity<*, *>.interrupt() = send(Interrupt) + + override suspend fun hold(duration: Duration) { + require(duration >= 0) { "The amount of time to hold must be a positive number" } + val envelope = prepare(Resume, process, process, duration).also { schedule(it) } + + try { + while (true) { + if (receive() == Resume) + return + } + } finally { + envelope.canceled = true + } + } + + override suspend fun hold(duration: Duration, queue: Queue) { + require(duration >= 0) { "The amount of time to hold must be a positive number" } + val envelope = prepare(Resume, process, process, duration).also { schedule(it) } + + try { + while (true) { + val msg = receive() + if (msg == Resume) + return + queue.add(msg) + } + } finally { + envelope.canceled = true + } + } + + + // Completion continuation implementation + /** + * Resume the execution of this continuation with the given value. + * + * @param value The value to resume with. + */ + override fun resume(value: Unit) { + // Deregister process from registry in order to have the GC collect this context + registry.remove(process) + } + + /** + * Resume the execution of this continuation with an exception. + * + * @param exception The exception to resume with. + */ + override fun resumeWithException(exception: Throwable) { + // Deregister process from registry in order to have the GC collect this context:w + registry.remove(process) + + logger.error(exception) { "An exception occurred during the execution of a process" } + } + } +} -- cgit v1.2.3 From 99dc5592bbca0f7d3a9d411268608c112b135896 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 15 Feb 2018 16:18:55 +0100 Subject: refactor(#18): Handle process launch using a kernel process This change will make the kernel handle the launch of processes using a kernel process that is launched at the start of the simulation and launches processes when it receives `Launch` messages. --- .../com/atlarge/opendc/omega/OmegaSimulation.kt | 142 ++++++++++++++------- 1 file changed, 99 insertions(+), 43 deletions(-) (limited to 'opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt') diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt index 80ac4600..71b20e34 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -46,7 +46,7 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot private val logger = KotlinLogging.logger {} /** - * The registry of the simulation kernels used in the experiment. + * The registry of the processes used in the simulation. */ private val registry: MutableMap, OmegaContext<*>> = HashMap() @@ -56,9 +56,29 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot private val queue: Queue = PriorityQueue(Comparator.comparingLong(MessageContainer::time)) /** - * The processes to be spawned. + * The kernel process instance that handles internal operations during the simulation. */ - private val spawnings: Queue> = ArrayDeque() + private val process = object : Process { + override val initialState = Unit + + override suspend fun Context.run() { + while(true) { + val msg = receive() + when (msg) { + is Launch<*> -> + @Suppress("UNCHECKED_CAST") + launch((msg as Launch).process) + } + } + } + } + + /** + * The context associated with an [Entity]. + */ + @Suppress("UNCHECKED_CAST") + private val , S, M> E.context: OmegaContext? + get() = registry[this] as? OmegaContext /** * The simulation time. @@ -68,25 +88,30 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot /** * The model of simulation. */ - override val model: M = bootstrap.apply(this) + // XXX: the bootstrap requires the properties of this class to be initialised, so changing the order may cause NPEs + override var model: M = bootstrap.apply(this) + /** + * The observable state of an [Entity] in simulation, which is provided by the simulation context. + */ override val , S> E.state: S get() = context?.state ?: initialState /** - * The context associated with an [Entity]. + * Initialise the simulation instance. */ - @Suppress("UNCHECKED_CAST") - private val , S, M> E.context: OmegaContext? - get() = registry[this] as? OmegaContext + init { + // Launch the Omega kernel process + launch(process) + } + // Bootstrap Context implementation override fun register(entity: Entity<*, M>): Boolean { if (!registry.containsKey(entity) && entity !is Process) { return false } - val process = entity as Process<*, M> - spawnings.add(process) + schedule(Launch(entity as Process<*, M>), process) return true } @@ -99,19 +124,9 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot override fun schedule(message: Any, destination: Entity<*, *>, sender: Entity<*, *>?, delay: Duration) = schedule(prepare(message, destination, sender, delay)) + // Simulation implementation override fun step() { while (true) { - // Initialise all spawned processes - while (spawnings.isNotEmpty()) { - val process = spawnings.poll() - val context = OmegaContext(process).also { registry[process] = it } - - // Bootstrap the process coroutine - val block: suspend () -> Unit = { context.start() } - block.startCoroutine(context) - - } - val envelope = queue.peek() ?: return val delivery = envelope.time @@ -144,9 +159,8 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot } } - override fun run() { - while (queue.isNotEmpty() || spawnings.isNotEmpty()) { + while (queue.isNotEmpty()) { step() } } @@ -158,7 +172,7 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot return } - while (time < until && (queue.isNotEmpty() || spawnings.isNotEmpty())) { + while (time < until && queue.isNotEmpty()) { step() } @@ -170,10 +184,42 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot } } + /** + * A wrapper around a message that has been scheduled for processing. + * + * @property message The message to wrap. + * @property time The point in time to deliver the message. + * @property sender The sender of the message. + * @property destination The destination of the message. + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ + private data class MessageContainer(override val message: Any, + val time: Instant, + override val sender: Entity<*, *>?, + override val destination: Entity<*, *>) : Envelope { + /** + * A flag to indicate the message has been canceled. + */ + internal var canceled: Boolean = false + } + + /** + * Schedule the given envelope to be processed by the kernel. + * + * @param envelope The envelope containing the message to schedule. + */ private fun schedule(envelope: MessageContainer) { queue.add(envelope) } + /** + * Prepare a message for scheduling by wrapping it into an envelope. + * + * @param message The message to send. + * @param destination The destination entity that should receive the message. + * @param sender The optional sender of the message. + * @param delay The time to delay the message. + */ private fun prepare(message: Any, destination: Entity<*, *>, sender: Entity<*, *>? = null, delay: Duration): MessageContainer { require(delay >= 0) { "The amount of time to delay the message must be a positive number" } @@ -181,19 +227,22 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot } /** - * This internal class provides the default implementation for the [Context] interface for this simulator. + * Launch the given [Process]. + * + * @param process The process to launch. */ - private inner class OmegaContext(val process: Process) : Context, Continuation { - /** - * The continuation to resume the execution of the process. - */ - lateinit var continuation: Continuation> + private fun launch(process: Process<*, M>) { + val context = OmegaContext(process).also { registry[process] = it } - /** - * The last point in time the process has done some work. - */ - var last: Instant = -1 + // Bootstrap the process coroutine + val block: suspend () -> Unit = { context.start() } + block.startCoroutine(context) + } + /** + * This internal class provides the default implementation for the [Context] interface for this simulator. + */ + private inner class OmegaContext(val process: Process) : Context, Continuation { /** * The model in which the process exists. */ @@ -230,19 +279,14 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot get() = context?.state ?: initialState /** - * Start the process associated with this context. + * The continuation to resume the execution of the process. */ - internal suspend fun start() = process.run { - run() - } + lateinit var continuation: Continuation> /** - * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the - * message has been received. - * - * @return The envelope containing the message. + * The last point in time the process has done some work. */ - suspend fun receiveEnvelope(): Envelope<*> = suspendCoroutine { continuation = it } + var last: Instant = -1 override suspend fun receive(transform: suspend Envelope<*>.(Any) -> T): T { val envelope = receiveEnvelope() @@ -304,6 +348,18 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot } } + /** + * Start the process associated with this context. + */ + internal suspend fun start() = process.run { run() } + + /** + * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the + * message has been received. + * + * @return The envelope containing the message. + */ + suspend fun receiveEnvelope(): Envelope<*> = suspendCoroutine { continuation = it } // Completion continuation implementation /** -- cgit v1.2.3 From 2fa0134773a99394aae0efc167af6767e2828c71 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 16 Feb 2018 14:32:16 +0100 Subject: refactor(#18): Fix broken receive() timeout This change fixes the broken implementation of the `receive()` method with a timeout due to an invalid condition. --- .../src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt') diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt index 71b20e34..9c6d4ab3 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -300,7 +300,7 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot try { val received = receiveEnvelope() - if (received.message == Timeout) { + if (received.message != Timeout) { send.canceled = true return transform(received, received.message) } -- cgit v1.2.3 From b84a995a05fecb9ef90c9184959f285f324e7411 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 16 Feb 2018 14:39:53 +0100 Subject: refactor(#18): Provide access to latest sender This change adds a `sender` property to the `Context` interface to provide processes access to the sender of the latest received message. Please note that methods like `hold()` and `interrupt()` may change the value of this property. --- .../com/atlarge/opendc/omega/OmegaSimulation.kt | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) (limited to 'opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt') diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt index 9c6d4ab3..bd3f4529 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -249,11 +249,6 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot override val model: M get() = this@OmegaSimulation.model - /** - * The state of the entity. - */ - override var state: S = process.initialState - /** * The current point in simulation time. */ @@ -268,9 +263,9 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot get() = maxOf(time - last, 0) /** - * The [CoroutineContext] for a [Context]. + * The state of the entity. */ - override val context: CoroutineContext = EmptyCoroutineContext + override var state: S = process.initialState /** * The observable state of an [Entity] within the simulation is provided by the context of the simulation. @@ -278,6 +273,16 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot override val , S> T.state: S get() = context?.state ?: initialState + /** + * The sender of the last received message or `null` in case the process has not received any messages yet. + */ + override var sender: Entity<*, *>? = null + + /** + * The [CoroutineContext] for a [Context]. + */ + override val context: CoroutineContext = EmptyCoroutineContext + /** * The continuation to resume the execution of the process. */ @@ -359,7 +364,8 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot * * @return The envelope containing the message. */ - suspend fun receiveEnvelope(): Envelope<*> = suspendCoroutine { continuation = it } + suspend fun receiveEnvelope() = suspendCoroutine> { continuation = it } + .also { sender = it.sender } // Completion continuation implementation /** -- cgit v1.2.3 From f8a4095d1824df095ea91253f914bc0512646684 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 16 Feb 2018 15:25:19 +0100 Subject: refactor(#18): Provide access to process context in nested calls This change provides a method in the standard library to access the process context in nested suspending function calls. --- .../main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) (limited to 'opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt') diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt index bd3f4529..22382ccd 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -242,7 +242,8 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot /** * This internal class provides the default implementation for the [Context] interface for this simulator. */ - private inner class OmegaContext(val process: Process) : Context, Continuation { + private inner class OmegaContext(val process: Process) : Context, Continuation, + AbstractCoroutineContextElement(Context) { /** * The model in which the process exists. */ @@ -255,6 +256,12 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot override val time: Instant get() = this@OmegaSimulation.time + /** + * The [Entity] associated with this context. + */ + override val self: Entity + get() = process + /** * The duration between the current point in simulation time and the last point in simulation time where the * [Context] has executed some work. @@ -281,7 +288,7 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot /** * The [CoroutineContext] for a [Context]. */ - override val context: CoroutineContext = EmptyCoroutineContext + override val context: CoroutineContext = this /** * The continuation to resume the execution of the process. @@ -321,7 +328,7 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot override suspend fun Entity<*, *>.send(msg: Any, sender: Entity<*, *>, delay: Duration) = schedule(prepare(msg, this, sender, delay)) - override suspend fun Entity<*, *>.interrupt() = send(Interrupt) + override suspend fun Entity<*, *>.interrupt(interrupt: Interrupt) = send(interrupt) override suspend fun hold(duration: Duration) { require(duration >= 0) { "The amount of time to hold must be a positive number" } -- cgit v1.2.3 From ac0a8505ceb817e33011ff80f8585b199896b9c4 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 19 Feb 2018 14:15:55 +0100 Subject: bug: Make message priority queue stable This change fixes the bug where the insertion order into the message queue was not guaranteed for messages arriving at the same time, causing some non-deterministic behaviour. --- .../kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) (limited to 'opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt') diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt index 22382ccd..8c1497c8 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -53,7 +53,9 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot /** * The message queue. */ - private val queue: Queue = PriorityQueue(Comparator.comparingLong(MessageContainer::time)) + private val queue: Queue = PriorityQueue(Comparator + .comparingLong(MessageContainer::time) + .thenComparingLong(MessageContainer::id)) /** * The kernel process instance that handles internal operations during the simulation. @@ -184,16 +186,23 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot } } + /** + * The identifier for the next message to be scheduled. + */ + private var nextId: Long = 0 + /** * A wrapper around a message that has been scheduled for processing. * + * @property id The identifier of the message to keep the priority queue stable * @property message The message to wrap. * @property time The point in time to deliver the message. * @property sender The sender of the message. * @property destination The destination of the message. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ - private data class MessageContainer(override val message: Any, + private data class MessageContainer(val id: Long, + override val message: Any, val time: Instant, override val sender: Entity<*, *>?, override val destination: Entity<*, *>) : Envelope { @@ -223,7 +232,7 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot private fun prepare(message: Any, destination: Entity<*, *>, sender: Entity<*, *>? = null, delay: Duration): MessageContainer { require(delay >= 0) { "The amount of time to delay the message must be a positive number" } - return MessageContainer(message, time + delay, sender, destination) + return MessageContainer(nextId++, message, time + delay, sender, destination) } /** -- cgit v1.2.3 From 59247a4f7a2dc948b3a63ff185c64922eb4334ea Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 19 Feb 2018 15:09:19 +0100 Subject: refactor(#18): Refactor unused transformation receive methods This change removes the unused transformation receive methods from the `Context` class as this functionality can now be easily implemented in the standard library using the newly introduced `sender` property. --- .../com/atlarge/opendc/omega/OmegaSimulation.kt | 37 ++++++++++------------ 1 file changed, 16 insertions(+), 21 deletions(-) (limited to 'opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt') diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt index 8c1497c8..4d94bf9e 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -53,9 +53,9 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot /** * The message queue. */ - private val queue: Queue = PriorityQueue(Comparator - .comparingLong(MessageContainer::time) - .thenComparingLong(MessageContainer::id)) + private val queue: Queue = PriorityQueue(Comparator + .comparingLong(Envelope::time) + .thenComparingLong(Envelope::id)) /** * The kernel process instance that handles internal operations during the simulation. @@ -199,13 +199,12 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot * @property time The point in time to deliver the message. * @property sender The sender of the message. * @property destination The destination of the message. - * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ - private data class MessageContainer(val id: Long, - override val message: Any, - val time: Instant, - override val sender: Entity<*, *>?, - override val destination: Entity<*, *>) : Envelope { + private data class Envelope(val id: Long, + val message: Any, + val time: Instant, + val sender: Entity<*, *>?, + val destination: Entity<*, *>) { /** * A flag to indicate the message has been canceled. */ @@ -217,7 +216,7 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot * * @param envelope The envelope containing the message to schedule. */ - private fun schedule(envelope: MessageContainer) { + private fun schedule(envelope: Envelope) { queue.add(envelope) } @@ -230,9 +229,9 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot * @param delay The time to delay the message. */ private fun prepare(message: Any, destination: Entity<*, *>, sender: Entity<*, *>? = null, - delay: Duration): MessageContainer { + delay: Duration): Envelope { require(delay >= 0) { "The amount of time to delay the message must be a positive number" } - return MessageContainer(nextId++, message, time + delay, sender, destination) + return Envelope(nextId++, message, time + delay, sender, destination) } /** @@ -302,20 +301,16 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot /** * The continuation to resume the execution of the process. */ - lateinit var continuation: Continuation> + lateinit var continuation: Continuation /** * The last point in time the process has done some work. */ var last: Instant = -1 - override suspend fun receive(transform: suspend Envelope<*>.(Any) -> T): T { - val envelope = receiveEnvelope() - return transform(envelope, envelope.message) - } - + override suspend fun receive(): Any = receiveEnvelope().message - override suspend fun receive(timeout: Duration, transform: suspend Envelope<*>.(Any) -> T): T? { + override suspend fun receive(timeout: Duration): Any? { val send = prepare(Timeout, process, process, timeout).also { schedule(it) } try { @@ -323,7 +318,7 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot if (received.message != Timeout) { send.canceled = true - return transform(received, received.message) + return received.message } return null @@ -380,7 +375,7 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot * * @return The envelope containing the message. */ - suspend fun receiveEnvelope() = suspendCoroutine> { continuation = it } + suspend fun receiveEnvelope() = suspendCoroutine { continuation = it } .also { sender = it.sender } // Completion continuation implementation -- cgit v1.2.3 From 86dc826db4cd91b5a6875d9ecdd64c9238d7b95c Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 19 Feb 2018 15:12:16 +0100 Subject: refactor(#18): Simplify Context interface This change simplifies the `Context` interface to reduce the amount of methods required to implement by implementors. --- .../src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt | 2 -- 1 file changed, 2 deletions(-) (limited to 'opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt') diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt index 4d94bf9e..532a033a 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -327,8 +327,6 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot } } - override suspend fun Entity<*, *>.send(msg: Any, delay: Duration) = send(msg, process, delay) - override suspend fun Entity<*, *>.send(msg: Any, sender: Entity<*, *>, delay: Duration) = schedule(prepare(msg, this, sender, delay)) -- cgit v1.2.3