From b1c4d1f94e35445bdba5a56b614d0ec28a332624 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 11 Jan 2018 16:27:05 +0100 Subject: refactor(#18): Redesign core simulation API This change contains the redesign of the core simulation API and provides a cleaner interface for developing simulation models for the users. --- .../com/atlarge/opendc/omega/MessageContainer.kt | 48 ++++ .../kotlin/com/atlarge/opendc/omega/OmegaKernel.kt | 315 +++++++++++++++++++++ .../com/atlarge/opendc/omega/OmegaKernelFactory.kt | 47 +++ .../main/kotlin/com/atlarge/opendc/omega/Resume.kt | 39 +++ .../kotlin/com/atlarge/opendc/omega/Timeout.kt | 35 +++ .../kotlin/com/atlarge/opendc/omega/SmokeTest.kt | 116 ++++++++ 6 files changed, 600 insertions(+) create mode 100644 opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/MessageContainer.kt create mode 100644 opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt create mode 100644 opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernelFactory.kt create mode 100644 opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Resume.kt create mode 100644 opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Timeout.kt create mode 100644 opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt (limited to 'opendc-kernel-omega/src') diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/MessageContainer.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/MessageContainer.kt new file mode 100644 index 00000000..af13d1fd --- /dev/null +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/MessageContainer.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.omega + +import com.atlarge.opendc.simulator.Instant +import com.atlarge.opendc.simulator.Entity +import com.atlarge.opendc.simulator.Envelope + +/** + * A wrapper around a message that has been scheduled for processing. + * + * @property message The message to wrap. + * @property time The point in time to deliver the message. + * @property sender The sender of the message. + * @property destination The destination of the message. + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +internal data class MessageContainer(override val message: Any, + val time: Instant, + override val sender: Entity<*, *>?, + override val destination: Entity<*, *>) : Envelope { + /** + * A flag to indicate the message has been canceled. + */ + internal var canceled: Boolean = false +} diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt new file mode 100644 index 00000000..fb5ce24b --- /dev/null +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt @@ -0,0 +1,315 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.omega + +import com.atlarge.opendc.simulator.* +import com.atlarge.opendc.simulator.kernel.Kernel +import com.atlarge.opendc.simulator.Bootstrap +import mu.KotlinLogging +import java.util.* +import kotlin.coroutines.experimental.* + +/** + * The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core. + * + * This simulator implementation is a single-threaded implementation, running simulation kernels synchronously and + * provides a single priority queue for all events (messages, ticks, etc) that occur in the entities. + * + * @property model The model that is simulated. + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +internal class OmegaKernel(bootstrap: Bootstrap) : Kernel, Bootstrap.Context { + /** + * The logger instance to use for the simulator. + */ + private val logger = KotlinLogging.logger {} + + /** + * The registry of the simulation kernels used in the experiment. + */ + private val registry: MutableMap, OmegaContext<*>> = HashMap() + + /** + * The message queue. + */ + private val queue: Queue = PriorityQueue(Comparator.comparingLong(MessageContainer::time)) + + /** + * The simulation time. + */ + override var time: Instant = 0 + + /** + * The model of simulation. + */ + override val model: M = bootstrap.bootstrap(this) + + override val , S> E.state: S + get() = context?.state ?: initialState + + /** + * The context associated with an [Entity]. + */ + @Suppress("UNCHECKED_CAST") + private val , S, M> E.context: OmegaContext? + get() = registry[this] as? OmegaContext + + override fun register(entity: Entity<*, M>): Boolean { + if (!registry.containsKey(entity) && entity !is Process) { + return false + } + + @Suppress("UNCHECKED_CAST") + val process = entity as Process + val context = OmegaContext(entity).also { registry.put(entity, it) } + + // Bootstrap the process coroutine + val block: suspend () -> Unit = { process.run { context.run() } } + block.startCoroutine(context) + + return true + } + + override fun deregister(entity: Entity<*, M>): Boolean { + val context = entity.context ?: return false + context.resume(Unit) + return true + } + + override fun schedule(message: Any, destination: Entity<*, *>, sender: Entity<*, *>?, delay: Duration) = + schedule(prepare(message, destination, sender, delay)) + + override fun step() { + while (true) { + val envelope = queue.peek() ?: return + val delivery = envelope.time + + if (delivery > time) { + // Tick has yet to occur + // Jump in time to next event + time = delivery + break + } else if (delivery < time) { + // Tick has already occurred + logger.warn { "message processed out of order" } + } + + queue.poll() + + // If the sender has canceled the message, we move on to the next message + if (envelope.canceled) { + continue + } + + val context = envelope.destination.context ?: continue + + if (envelope.message !is Interrupt) { + context.continuation.resume(envelope) + } else { + context.continuation.resumeWithException(envelope.message) + } + + context.last = time + } + } + + + override fun run() { + while (queue.isNotEmpty()) { + step() + } + } + + override fun run(until: Instant) { + require(until > 0) { "The given instant must be a non-zero positive number" } + + if (time >= until) { + return + } + + while (time < until && queue.isNotEmpty()) { + step() + } + + // Fix clock if step() jumped too far in time to give the impression to the user that simulation stopped at + // exactly the tick it gave. This has not effect on the actual simulation results as the next call to run() will + // just jump forward again. + if (time > until) { + time = until + } + } + + private fun schedule(envelope: MessageContainer) { + queue.add(envelope) + } + + private fun prepare(message: Any, destination: Entity<*, *>, sender: Entity<*, *>? = null, + delay: Duration): MessageContainer { + require(delay >= 0) { "The amount of time to delay the message must be a positive number" } + return MessageContainer(message, time + delay, sender, destination) + } + + /** + * This internal class provides the default implementation for the [Context] interface for this simulator. + */ + private inner class OmegaContext(val process: Process) : Context, Continuation { + /** + * The continuation to resume the execution of the process. + */ + lateinit var continuation: Continuation> + + /** + * The last point in time the process has done some work. + */ + var last: Instant = -1 + + /** + * The model in which the process exists. + */ + override val model: M + get() = this@OmegaKernel.model + + /** + * The state of the entity. + */ + override var state: S = process.initialState + + /** + * The current point in simulation time. + */ + override val time: Instant + get() = this@OmegaKernel.time + + /** + * The duration between the current point in simulation time and the last point in simulation time where the + * [Context] has executed some work. + */ + override val delta: Duration + get() = maxOf(time - last, 0) + + /** + * The [CoroutineContext] for a [Context]. + */ + override val context: CoroutineContext = EmptyCoroutineContext + + /** + * The observable state of an [Entity] within the simulation is provided by the context of the simulation. + */ + override val , S> T.state: S + get() = context?.state ?: initialState + + /** + * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the + * message has been received. + * + * @return The envelope containing the message. + */ + suspend fun receiveEnvelope(): Envelope<*> = suspendCoroutine { continuation = it } + + suspend override fun receive(transform: suspend Envelope<*>.(Any) -> T): T { + val envelope = receiveEnvelope() + return transform(envelope, envelope.message) + } + + + suspend override fun receive(timeout: Duration, transform: suspend Envelope<*>.(Any) -> T): T? { + val send = prepare(Timeout, process, process, timeout).also { schedule(it) } + + try { + val received = receiveEnvelope() + + if (received.message !is Timeout) { + send.canceled = true + return transform(received, received.message) + } + + return null + } finally { + send.canceled = true + } + } + + suspend override fun Entity<*, *>.send(msg: Any, delay: Duration) = send(msg, process, delay) + + suspend override fun Entity<*, *>.send(msg: Any, sender: Entity<*, *>, delay: Duration) = + schedule(prepare(msg, sender, delay = delay)) + + suspend override fun Entity<*, *>.interrupt() = send(Interrupt) + + suspend override fun hold(duration: Duration) { + require(duration >= 0) { "The amount of time to hold must be a positive number" } + val envelope = prepare(Resume, process, process, duration).also { schedule(it) } + + try { + while (true) { + if (receive() is Resume) + return + } + } finally { + envelope.canceled = true + } + } + + suspend override fun hold(duration: Duration, queue: Queue) { + require(duration >= 0) { "The amount of time to hold must be a positive number" } + val envelope = prepare(Resume, process, process, duration).also { schedule(it) } + + try { + while (true) { + val msg = receive() + if (msg is Resume) + return + queue.add(msg) + } + } finally { + envelope.canceled = true + } + } + + + // Completion continuation implementation + /** + * Resume the execution of this continuation with the given value. + * + * @param value The value to resume with. + */ + override fun resume(value: Unit) { + // Deregister process from registry in order to have the GC collect this context + registry.remove(process) + } + + /** + * Resume the execution of this continuation with an exception. + * + * @param exception The exception to resume with. + */ + override fun resumeWithException(exception: Throwable) { + // Deregister process from registry in order to have the GC collect this context:w + registry.remove(process) + + logger.error(exception) { "An exception occurred during the execution of a process" } + } + } +} diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernelFactory.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernelFactory.kt new file mode 100644 index 00000000..dcad4dce --- /dev/null +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernelFactory.kt @@ -0,0 +1,47 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.omega + +import com.atlarge.opendc.simulator.kernel.KernelFactory +import com.atlarge.opendc.simulator.kernel.Kernel +import com.atlarge.opendc.simulator.Bootstrap + +/** + * The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core. + * + * This simulator implementation is a single-threaded implementation, running simulation kernels synchronously and + * provides a single priority queue for all events (messages, ticks, etc) that occur in the entities. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +object OmegaKernelFactory : KernelFactory { + /** + * Create a simulation over the given model facilitated by this simulation kernel. + * + * @param bootstrap The bootstrap procedure to bootstrap the simulation with. + * @return A [Kernel] instance to control the simulation. + */ + override fun create(bootstrap: Bootstrap): Kernel = OmegaKernel(bootstrap) +} diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Resume.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Resume.kt new file mode 100644 index 00000000..d4bd8536 --- /dev/null +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Resume.kt @@ -0,0 +1,39 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.omega + +import com.atlarge.opendc.simulator.Context +import com.atlarge.opendc.simulator.Process + +/** + * An internal message used by the Omega simulation kernel to indicate to a suspended [Process], that it should wake up + * and resume execution. + * + * This message is not guaranteed to work on other simulation kernels and [Context.interrupt] should be preferred to + * wake up a process from another entity. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +object Resume diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Timeout.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Timeout.kt new file mode 100644 index 00000000..c205f6b5 --- /dev/null +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Timeout.kt @@ -0,0 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.omega + +import com.atlarge.opendc.simulator.Process + +/** + * An internal message used by the Omega simulation kernel to indicate to a suspended [Process], that a timeout has been + * reached and that it should wake up and resume execution. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +object Timeout diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt new file mode 100644 index 00000000..b358d618 --- /dev/null +++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt @@ -0,0 +1,116 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.omega + +import com.atlarge.opendc.simulator.Context +import com.atlarge.opendc.simulator.Process +import com.atlarge.opendc.simulator.Bootstrap +import org.junit.jupiter.api.Test + +/** + * This test suite checks for smoke when running a large amount of simulations. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +internal class SmokeTest { + class EchoProcess : Process { + override val initialState = Unit + suspend override fun Context.run() { + while (true) { + receive { + sender?.send(message) + } + } + } + } + + /** + * Run a large amount of simulations and test if any exceptions occur. + */ + @Test + fun smoke() { + val n = 1000 + val messages = 100 + val bootstrap: Bootstrap = Bootstrap.create { ctx -> + repeat(n) { + EchoProcess().also { + ctx.register(it) + + for (i in 1 until messages) { + ctx.schedule(i, it, delay = i.toLong()) + } + } + } + } + val kernel = OmegaKernelFactory.create(bootstrap) + kernel.run() + } + + class NullProcess : Process { + override val initialState = Unit + suspend override fun Context.run() {} + } + + /** + * Test if the kernel allows sending messages to [Context] instances that have already stopped. + */ + @Test + fun `sending message to process that has gracefully stopped`() { + val process = NullProcess() + val bootstrap: Bootstrap = Bootstrap.create { ctx -> + process.also { + ctx.register(it) + ctx.schedule(0, it) + } + } + + val kernel = OmegaKernelFactory.create(bootstrap) + kernel.run() + } + + class CrashProcess : Process { + override val initialState = Unit + suspend override fun Context.run() { + TODO("This process should crash") + } + } + + /** + * Test if the kernel allows sending messages to [Context] instances that have crashed. + */ + @Test + fun `sending message to process that has crashed`() { + val process = CrashProcess() + val bootstrap: Bootstrap = Bootstrap.create { ctx -> + process.also { + ctx.register(it) + ctx.schedule(0, it) + } + } + + val kernel = OmegaKernelFactory.create(bootstrap) + kernel.run() + } +} -- cgit v1.2.3 From e97d9bf3f2cccf19a21631e26d55d60c9f4d7c7a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 12 Feb 2018 14:26:44 +0100 Subject: refactor(#18): Align formatting with official Kotlin Style Guide This change aligns the code formatting of the project with the official Kotlin Style Guide. They can be found at http://kotlinlang.org/docs/reference/coding-conventions.html. --- .../com/atlarge/opendc/omega/MessageContainer.kt | 16 +- .../kotlin/com/atlarge/opendc/omega/OmegaKernel.kt | 543 ++++++++++----------- .../com/atlarge/opendc/omega/OmegaKernelFactory.kt | 18 +- .../kotlin/com/atlarge/opendc/omega/SmokeTest.kt | 142 +++--- 4 files changed, 359 insertions(+), 360 deletions(-) (limited to 'opendc-kernel-omega/src') diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/MessageContainer.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/MessageContainer.kt index af13d1fd..32f27111 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/MessageContainer.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/MessageContainer.kt @@ -24,9 +24,9 @@ package com.atlarge.opendc.omega -import com.atlarge.opendc.simulator.Instant import com.atlarge.opendc.simulator.Entity import com.atlarge.opendc.simulator.Envelope +import com.atlarge.opendc.simulator.Instant /** * A wrapper around a message that has been scheduled for processing. @@ -38,11 +38,11 @@ import com.atlarge.opendc.simulator.Envelope * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ internal data class MessageContainer(override val message: Any, - val time: Instant, - override val sender: Entity<*, *>?, - override val destination: Entity<*, *>) : Envelope { - /** - * A flag to indicate the message has been canceled. - */ - internal var canceled: Boolean = false + val time: Instant, + override val sender: Entity<*, *>?, + override val destination: Entity<*, *>) : Envelope { + /** + * A flag to indicate the message has been canceled. + */ + internal var canceled: Boolean = false } diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt index fb5ce24b..c729a63d 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt @@ -26,7 +26,6 @@ package com.atlarge.opendc.omega import com.atlarge.opendc.simulator.* import com.atlarge.opendc.simulator.kernel.Kernel -import com.atlarge.opendc.simulator.Bootstrap import mu.KotlinLogging import java.util.* import kotlin.coroutines.experimental.* @@ -41,275 +40,275 @@ import kotlin.coroutines.experimental.* * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ internal class OmegaKernel(bootstrap: Bootstrap) : Kernel, Bootstrap.Context { - /** - * The logger instance to use for the simulator. - */ - private val logger = KotlinLogging.logger {} - - /** - * The registry of the simulation kernels used in the experiment. - */ - private val registry: MutableMap, OmegaContext<*>> = HashMap() - - /** - * The message queue. - */ - private val queue: Queue = PriorityQueue(Comparator.comparingLong(MessageContainer::time)) - - /** - * The simulation time. - */ - override var time: Instant = 0 - - /** - * The model of simulation. - */ - override val model: M = bootstrap.bootstrap(this) - - override val , S> E.state: S - get() = context?.state ?: initialState - - /** - * The context associated with an [Entity]. - */ - @Suppress("UNCHECKED_CAST") - private val , S, M> E.context: OmegaContext? - get() = registry[this] as? OmegaContext - - override fun register(entity: Entity<*, M>): Boolean { - if (!registry.containsKey(entity) && entity !is Process) { - return false - } - - @Suppress("UNCHECKED_CAST") - val process = entity as Process - val context = OmegaContext(entity).also { registry.put(entity, it) } - - // Bootstrap the process coroutine - val block: suspend () -> Unit = { process.run { context.run() } } - block.startCoroutine(context) - - return true - } - - override fun deregister(entity: Entity<*, M>): Boolean { - val context = entity.context ?: return false - context.resume(Unit) - return true - } - - override fun schedule(message: Any, destination: Entity<*, *>, sender: Entity<*, *>?, delay: Duration) = - schedule(prepare(message, destination, sender, delay)) - - override fun step() { - while (true) { - val envelope = queue.peek() ?: return - val delivery = envelope.time - - if (delivery > time) { - // Tick has yet to occur - // Jump in time to next event - time = delivery - break - } else if (delivery < time) { - // Tick has already occurred - logger.warn { "message processed out of order" } - } - - queue.poll() - - // If the sender has canceled the message, we move on to the next message - if (envelope.canceled) { - continue - } - - val context = envelope.destination.context ?: continue - - if (envelope.message !is Interrupt) { - context.continuation.resume(envelope) - } else { - context.continuation.resumeWithException(envelope.message) - } - - context.last = time - } - } - - - override fun run() { - while (queue.isNotEmpty()) { - step() - } - } - - override fun run(until: Instant) { - require(until > 0) { "The given instant must be a non-zero positive number" } - - if (time >= until) { - return - } - - while (time < until && queue.isNotEmpty()) { - step() - } - - // Fix clock if step() jumped too far in time to give the impression to the user that simulation stopped at - // exactly the tick it gave. This has not effect on the actual simulation results as the next call to run() will - // just jump forward again. - if (time > until) { - time = until - } - } - - private fun schedule(envelope: MessageContainer) { - queue.add(envelope) - } - - private fun prepare(message: Any, destination: Entity<*, *>, sender: Entity<*, *>? = null, - delay: Duration): MessageContainer { - require(delay >= 0) { "The amount of time to delay the message must be a positive number" } - return MessageContainer(message, time + delay, sender, destination) - } - - /** - * This internal class provides the default implementation for the [Context] interface for this simulator. - */ - private inner class OmegaContext(val process: Process) : Context, Continuation { - /** - * The continuation to resume the execution of the process. - */ - lateinit var continuation: Continuation> - - /** - * The last point in time the process has done some work. - */ - var last: Instant = -1 - - /** - * The model in which the process exists. - */ - override val model: M - get() = this@OmegaKernel.model - - /** - * The state of the entity. - */ - override var state: S = process.initialState - - /** - * The current point in simulation time. - */ - override val time: Instant - get() = this@OmegaKernel.time - - /** - * The duration between the current point in simulation time and the last point in simulation time where the - * [Context] has executed some work. - */ - override val delta: Duration - get() = maxOf(time - last, 0) - - /** - * The [CoroutineContext] for a [Context]. - */ - override val context: CoroutineContext = EmptyCoroutineContext - - /** - * The observable state of an [Entity] within the simulation is provided by the context of the simulation. - */ - override val , S> T.state: S - get() = context?.state ?: initialState - - /** - * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the - * message has been received. - * - * @return The envelope containing the message. - */ - suspend fun receiveEnvelope(): Envelope<*> = suspendCoroutine { continuation = it } - - suspend override fun receive(transform: suspend Envelope<*>.(Any) -> T): T { - val envelope = receiveEnvelope() - return transform(envelope, envelope.message) - } - - - suspend override fun receive(timeout: Duration, transform: suspend Envelope<*>.(Any) -> T): T? { - val send = prepare(Timeout, process, process, timeout).also { schedule(it) } - - try { - val received = receiveEnvelope() - - if (received.message !is Timeout) { - send.canceled = true - return transform(received, received.message) - } - - return null - } finally { - send.canceled = true - } - } - - suspend override fun Entity<*, *>.send(msg: Any, delay: Duration) = send(msg, process, delay) - - suspend override fun Entity<*, *>.send(msg: Any, sender: Entity<*, *>, delay: Duration) = - schedule(prepare(msg, sender, delay = delay)) - - suspend override fun Entity<*, *>.interrupt() = send(Interrupt) - - suspend override fun hold(duration: Duration) { - require(duration >= 0) { "The amount of time to hold must be a positive number" } - val envelope = prepare(Resume, process, process, duration).also { schedule(it) } - - try { - while (true) { - if (receive() is Resume) - return - } - } finally { - envelope.canceled = true - } - } - - suspend override fun hold(duration: Duration, queue: Queue) { - require(duration >= 0) { "The amount of time to hold must be a positive number" } - val envelope = prepare(Resume, process, process, duration).also { schedule(it) } - - try { - while (true) { - val msg = receive() - if (msg is Resume) - return - queue.add(msg) - } - } finally { - envelope.canceled = true - } - } - - - // Completion continuation implementation - /** - * Resume the execution of this continuation with the given value. - * - * @param value The value to resume with. - */ - override fun resume(value: Unit) { - // Deregister process from registry in order to have the GC collect this context - registry.remove(process) - } - - /** - * Resume the execution of this continuation with an exception. - * - * @param exception The exception to resume with. - */ - override fun resumeWithException(exception: Throwable) { - // Deregister process from registry in order to have the GC collect this context:w - registry.remove(process) - - logger.error(exception) { "An exception occurred during the execution of a process" } - } - } + /** + * The logger instance to use for the simulator. + */ + private val logger = KotlinLogging.logger {} + + /** + * The registry of the simulation kernels used in the experiment. + */ + private val registry: MutableMap, OmegaContext<*>> = HashMap() + + /** + * The message queue. + */ + private val queue: Queue = PriorityQueue(Comparator.comparingLong(MessageContainer::time)) + + /** + * The simulation time. + */ + override var time: Instant = 0 + + /** + * The model of simulation. + */ + override val model: M = bootstrap.bootstrap(this) + + override val , S> E.state: S + get() = context?.state ?: initialState + + /** + * The context associated with an [Entity]. + */ + @Suppress("UNCHECKED_CAST") + private val , S, M> E.context: OmegaContext? + get() = registry[this] as? OmegaContext + + override fun register(entity: Entity<*, M>): Boolean { + if (!registry.containsKey(entity) && entity !is Process) { + return false + } + + @Suppress("UNCHECKED_CAST") + val process = entity as Process + val context = OmegaContext(entity).also { registry[entity] = it } + + // Bootstrap the process coroutine + val block: suspend () -> Unit = { process.run { context.run() } } + block.startCoroutine(context) + + return true + } + + override fun deregister(entity: Entity<*, M>): Boolean { + val context = entity.context ?: return false + context.resume(Unit) + return true + } + + override fun schedule(message: Any, destination: Entity<*, *>, sender: Entity<*, *>?, delay: Duration) = + schedule(prepare(message, destination, sender, delay)) + + override fun step() { + while (true) { + val envelope = queue.peek() ?: return + val delivery = envelope.time + + if (delivery > time) { + // Tick has yet to occur + // Jump in time to next event + time = delivery + break + } else if (delivery < time) { + // Tick has already occurred + logger.warn { "message processed out of order" } + } + + queue.poll() + + // If the sender has canceled the message, we move on to the next message + if (envelope.canceled) { + continue + } + + val context = envelope.destination.context ?: continue + + if (envelope.message !is Interrupt) { + context.continuation.resume(envelope) + } else { + context.continuation.resumeWithException(envelope.message) + } + + context.last = time + } + } + + + override fun run() { + while (queue.isNotEmpty()) { + step() + } + } + + override fun run(until: Instant) { + require(until > 0) { "The given instant must be a non-zero positive number" } + + if (time >= until) { + return + } + + while (time < until && queue.isNotEmpty()) { + step() + } + + // Fix clock if step() jumped too far in time to give the impression to the user that simulation stopped at + // exactly the tick it gave. This has not effect on the actual simulation results as the next call to run() will + // just jump forward again. + if (time > until) { + time = until + } + } + + private fun schedule(envelope: MessageContainer) { + queue.add(envelope) + } + + private fun prepare(message: Any, destination: Entity<*, *>, sender: Entity<*, *>? = null, + delay: Duration): MessageContainer { + require(delay >= 0) { "The amount of time to delay the message must be a positive number" } + return MessageContainer(message, time + delay, sender, destination) + } + + /** + * This internal class provides the default implementation for the [Context] interface for this simulator. + */ + private inner class OmegaContext(val process: Process) : Context, Continuation { + /** + * The continuation to resume the execution of the process. + */ + lateinit var continuation: Continuation> + + /** + * The last point in time the process has done some work. + */ + var last: Instant = -1 + + /** + * The model in which the process exists. + */ + override val model: M + get() = this@OmegaKernel.model + + /** + * The state of the entity. + */ + override var state: S = process.initialState + + /** + * The current point in simulation time. + */ + override val time: Instant + get() = this@OmegaKernel.time + + /** + * The duration between the current point in simulation time and the last point in simulation time where the + * [Context] has executed some work. + */ + override val delta: Duration + get() = maxOf(time - last, 0) + + /** + * The [CoroutineContext] for a [Context]. + */ + override val context: CoroutineContext = EmptyCoroutineContext + + /** + * The observable state of an [Entity] within the simulation is provided by the context of the simulation. + */ + override val , S> T.state: S + get() = context?.state ?: initialState + + /** + * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the + * message has been received. + * + * @return The envelope containing the message. + */ + suspend fun receiveEnvelope(): Envelope<*> = suspendCoroutine { continuation = it } + + override suspend fun receive(transform: suspend Envelope<*>.(Any) -> T): T { + val envelope = receiveEnvelope() + return transform(envelope, envelope.message) + } + + + override suspend fun receive(timeout: Duration, transform: suspend Envelope<*>.(Any) -> T): T? { + val send = prepare(Timeout, process, process, timeout).also { schedule(it) } + + try { + val received = receiveEnvelope() + + if (received.message == Timeout) { + send.canceled = true + return transform(received, received.message) + } + + return null + } finally { + send.canceled = true + } + } + + override suspend fun Entity<*, *>.send(msg: Any, delay: Duration) = send(msg, process, delay) + + override suspend fun Entity<*, *>.send(msg: Any, sender: Entity<*, *>, delay: Duration) = + schedule(prepare(msg, sender, delay = delay)) + + override suspend fun Entity<*, *>.interrupt() = send(Interrupt) + + override suspend fun hold(duration: Duration) { + require(duration >= 0) { "The amount of time to hold must be a positive number" } + val envelope = prepare(Resume, process, process, duration).also { schedule(it) } + + try { + while (true) { + if (receive() == Resume) + return + } + } finally { + envelope.canceled = true + } + } + + override suspend fun hold(duration: Duration, queue: Queue) { + require(duration >= 0) { "The amount of time to hold must be a positive number" } + val envelope = prepare(Resume, process, process, duration).also { schedule(it) } + + try { + while (true) { + val msg = receive() + if (msg == Resume) + return + queue.add(msg) + } + } finally { + envelope.canceled = true + } + } + + + // Completion continuation implementation + /** + * Resume the execution of this continuation with the given value. + * + * @param value The value to resume with. + */ + override fun resume(value: Unit) { + // Deregister process from registry in order to have the GC collect this context + registry.remove(process) + } + + /** + * Resume the execution of this continuation with an exception. + * + * @param exception The exception to resume with. + */ + override fun resumeWithException(exception: Throwable) { + // Deregister process from registry in order to have the GC collect this context:w + registry.remove(process) + + logger.error(exception) { "An exception occurred during the execution of a process" } + } + } } diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernelFactory.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernelFactory.kt index dcad4dce..139cbd19 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernelFactory.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernelFactory.kt @@ -24,9 +24,9 @@ package com.atlarge.opendc.omega -import com.atlarge.opendc.simulator.kernel.KernelFactory -import com.atlarge.opendc.simulator.kernel.Kernel import com.atlarge.opendc.simulator.Bootstrap +import com.atlarge.opendc.simulator.kernel.Kernel +import com.atlarge.opendc.simulator.kernel.KernelFactory /** * The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core. @@ -37,11 +37,11 @@ import com.atlarge.opendc.simulator.Bootstrap * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ object OmegaKernelFactory : KernelFactory { - /** - * Create a simulation over the given model facilitated by this simulation kernel. - * - * @param bootstrap The bootstrap procedure to bootstrap the simulation with. - * @return A [Kernel] instance to control the simulation. - */ - override fun create(bootstrap: Bootstrap): Kernel = OmegaKernel(bootstrap) + /** + * Create a simulation over the given model facilitated by this simulation kernel. + * + * @param bootstrap The bootstrap procedure to bootstrap the simulation with. + * @return A [Kernel] instance to control the simulation. + */ + override fun create(bootstrap: Bootstrap): Kernel = OmegaKernel(bootstrap) } diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt index b358d618..a1ec8e88 100644 --- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt +++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt @@ -24,9 +24,9 @@ package com.atlarge.opendc.omega +import com.atlarge.opendc.simulator.Bootstrap import com.atlarge.opendc.simulator.Context import com.atlarge.opendc.simulator.Process -import com.atlarge.opendc.simulator.Bootstrap import org.junit.jupiter.api.Test /** @@ -35,82 +35,82 @@ import org.junit.jupiter.api.Test * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ internal class SmokeTest { - class EchoProcess : Process { - override val initialState = Unit - suspend override fun Context.run() { - while (true) { - receive { - sender?.send(message) - } - } - } - } + class EchoProcess : Process { + override val initialState = Unit + override suspend fun Context.run() { + while (true) { + receive { + sender?.send(message) + } + } + } + } - /** - * Run a large amount of simulations and test if any exceptions occur. - */ - @Test - fun smoke() { - val n = 1000 - val messages = 100 - val bootstrap: Bootstrap = Bootstrap.create { ctx -> - repeat(n) { - EchoProcess().also { - ctx.register(it) + /** + * Run a large amount of simulations and test if any exceptions occur. + */ + @Test + fun smoke() { + val n = 1000 + val messages = 100 + val bootstrap: Bootstrap = Bootstrap.create { ctx -> + repeat(n) { + EchoProcess().also { + ctx.register(it) - for (i in 1 until messages) { - ctx.schedule(i, it, delay = i.toLong()) - } - } - } - } - val kernel = OmegaKernelFactory.create(bootstrap) - kernel.run() - } + for (i in 1 until messages) { + ctx.schedule(i, it, delay = i.toLong()) + } + } + } + } + val kernel = OmegaKernelFactory.create(bootstrap) + kernel.run() + } - class NullProcess : Process { - override val initialState = Unit - suspend override fun Context.run() {} - } + class NullProcess : Process { + override val initialState = Unit + override suspend fun Context.run() {} + } - /** - * Test if the kernel allows sending messages to [Context] instances that have already stopped. - */ - @Test - fun `sending message to process that has gracefully stopped`() { - val process = NullProcess() - val bootstrap: Bootstrap = Bootstrap.create { ctx -> - process.also { - ctx.register(it) - ctx.schedule(0, it) - } - } + /** + * Test if the kernel allows sending messages to [Context] instances that have already stopped. + */ + @Test + fun `sending message to process that has gracefully stopped`() { + val process = NullProcess() + val bootstrap: Bootstrap = Bootstrap.create { ctx -> + process.also { + ctx.register(it) + ctx.schedule(0, it) + } + } - val kernel = OmegaKernelFactory.create(bootstrap) - kernel.run() - } + val kernel = OmegaKernelFactory.create(bootstrap) + kernel.run() + } - class CrashProcess : Process { - override val initialState = Unit - suspend override fun Context.run() { - TODO("This process should crash") - } - } + class CrashProcess : Process { + override val initialState = Unit + override suspend fun Context.run() { + TODO("This process should crash") + } + } - /** - * Test if the kernel allows sending messages to [Context] instances that have crashed. - */ - @Test - fun `sending message to process that has crashed`() { - val process = CrashProcess() - val bootstrap: Bootstrap = Bootstrap.create { ctx -> - process.also { - ctx.register(it) - ctx.schedule(0, it) - } - } + /** + * Test if the kernel allows sending messages to [Context] instances that have crashed. + */ + @Test + fun `sending message to process that has crashed`() { + val process = CrashProcess() + val bootstrap: Bootstrap = Bootstrap.create { ctx -> + process.also { + ctx.register(it) + ctx.schedule(0, it) + } + } - val kernel = OmegaKernelFactory.create(bootstrap) - kernel.run() - } + val kernel = OmegaKernelFactory.create(bootstrap) + kernel.run() + } } -- cgit v1.2.3 From 407877ff24d3c54747c8b15bba73b93b38c8b6e7 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 12 Feb 2018 22:27:44 +0100 Subject: refactor(#18): Launch processes at initial run This change will make the simulation kernel launch the processes at the initial run instead of when the processes are registered. --- .../kotlin/com/atlarge/opendc/omega/OmegaKernel.kt | 37 ++++++++++++++++------ .../kotlin/com/atlarge/opendc/omega/SmokeTest.kt | 36 ++++++++++++++++++--- 2 files changed, 59 insertions(+), 14 deletions(-) (limited to 'opendc-kernel-omega/src') diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt index c729a63d..91150078 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt @@ -55,6 +55,11 @@ internal class OmegaKernel(bootstrap: Bootstrap) : Kernel, Bootstrap.Co */ private val queue: Queue = PriorityQueue(Comparator.comparingLong(MessageContainer::time)) + /** + * The processes to be spawned. + */ + private val spawnings: Queue> = ArrayDeque() + /** * The simulation time. */ @@ -80,14 +85,8 @@ internal class OmegaKernel(bootstrap: Bootstrap) : Kernel, Bootstrap.Co return false } - @Suppress("UNCHECKED_CAST") - val process = entity as Process - val context = OmegaContext(entity).also { registry[entity] = it } - - // Bootstrap the process coroutine - val block: suspend () -> Unit = { process.run { context.run() } } - block.startCoroutine(context) - + val process = entity as Process<*, M> + spawnings.add(process) return true } @@ -102,6 +101,17 @@ internal class OmegaKernel(bootstrap: Bootstrap) : Kernel, Bootstrap.Co override fun step() { while (true) { + // Initialise all spawned processes + while (spawnings.isNotEmpty()) { + val process = spawnings.poll() + val context = OmegaContext(process).also { registry[process] = it } + + // Bootstrap the process coroutine + val block: suspend () -> Unit = { context.start() } + block.startCoroutine(context) + + } + val envelope = queue.peek() ?: return val delivery = envelope.time @@ -136,7 +146,7 @@ internal class OmegaKernel(bootstrap: Bootstrap) : Kernel, Bootstrap.Co override fun run() { - while (queue.isNotEmpty()) { + while (queue.isNotEmpty() || spawnings.isNotEmpty()) { step() } } @@ -148,7 +158,7 @@ internal class OmegaKernel(bootstrap: Bootstrap) : Kernel, Bootstrap.Co return } - while (time < until && queue.isNotEmpty()) { + while (time < until && (queue.isNotEmpty() || spawnings.isNotEmpty())) { step() } @@ -219,6 +229,13 @@ internal class OmegaKernel(bootstrap: Bootstrap) : Kernel, Bootstrap.Co override val , S> T.state: S get() = context?.state ?: initialState + /** + * Start the process associated with this context. + */ + internal suspend fun start() = process.run { + run() + } + /** * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the * message has been received. diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt index a1ec8e88..48b4f02a 100644 --- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt +++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt @@ -27,6 +27,8 @@ package com.atlarge.opendc.omega import com.atlarge.opendc.simulator.Bootstrap import com.atlarge.opendc.simulator.Context import com.atlarge.opendc.simulator.Process +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test /** @@ -68,7 +70,7 @@ internal class SmokeTest { kernel.run() } - class NullProcess : Process { + object NullProcess : Process { override val initialState = Unit override suspend fun Context.run() {} } @@ -78,7 +80,7 @@ internal class SmokeTest { */ @Test fun `sending message to process that has gracefully stopped`() { - val process = NullProcess() + val process = NullProcess val bootstrap: Bootstrap = Bootstrap.create { ctx -> process.also { ctx.register(it) @@ -90,7 +92,7 @@ internal class SmokeTest { kernel.run() } - class CrashProcess : Process { + object CrashProcess : Process { override val initialState = Unit override suspend fun Context.run() { TODO("This process should crash") @@ -102,7 +104,7 @@ internal class SmokeTest { */ @Test fun `sending message to process that has crashed`() { - val process = CrashProcess() + val process = CrashProcess val bootstrap: Bootstrap = Bootstrap.create { ctx -> process.also { ctx.register(it) @@ -113,4 +115,30 @@ internal class SmokeTest { val kernel = OmegaKernelFactory.create(bootstrap) kernel.run() } + + class ModelProcess(private val value: Int) : Process { + override val initialState = false + override suspend fun Context.run() { + assertEquals(value, model) + state = true + hold(10) + } + } + /** + * Test if the kernel allows access to the simulation model object. + */ + @Test + fun `access simulation model`() { + val value = 1 + val process = ModelProcess(value) + val bootstrap: Bootstrap = Bootstrap.create { ctx -> + ctx.register(process) + value + } + + val kernel = OmegaKernelFactory.create(bootstrap) + kernel.run(5) + + assertTrue(kernel.run { process.state }) + } } -- cgit v1.2.3 From 2e819d59934b5308bc58d6c69709112e2a6af402 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 13 Feb 2018 14:27:15 +0100 Subject: refactor(#18): Fix OpenDC model deployment This change fixes the deployment of the OpenDC simulation model. --- .../src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'opendc-kernel-omega/src') diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt index 91150078..6ece73aa 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt @@ -122,7 +122,7 @@ internal class OmegaKernel(bootstrap: Bootstrap) : Kernel, Bootstrap.Co break } else if (delivery < time) { // Tick has already occurred - logger.warn { "message processed out of order" } + logger.warn { "Message processed out of order" } } queue.poll() @@ -270,7 +270,7 @@ internal class OmegaKernel(bootstrap: Bootstrap) : Kernel, Bootstrap.Co override suspend fun Entity<*, *>.send(msg: Any, delay: Duration) = send(msg, process, delay) override suspend fun Entity<*, *>.send(msg: Any, sender: Entity<*, *>, delay: Duration) = - schedule(prepare(msg, sender, delay = delay)) + schedule(prepare(msg, this, sender, delay)) override suspend fun Entity<*, *>.interrupt() = send(Interrupt) -- cgit v1.2.3 From 157d30beb52c75831e29a1a22c199b95d6d30b42 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 14 Feb 2018 12:32:57 +0100 Subject: refactor(#18): Create distinction between kernel and simulation This change creates a distinction between a kernel and a simulation. A single simulation is represented by a `Simulation` object which provides control over the simulation, while the `Kernel` interface allows users to create a new simulation using that kernel as backend. --- .../kotlin/com/atlarge/opendc/omega/OmegaKernel.kt | 300 +------------------ .../com/atlarge/opendc/omega/OmegaKernelFactory.kt | 47 --- .../com/atlarge/opendc/omega/OmegaSimulation.kt | 331 +++++++++++++++++++++ .../kotlin/com/atlarge/opendc/omega/SmokeTest.kt | 18 +- 4 files changed, 348 insertions(+), 348 deletions(-) delete mode 100644 opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernelFactory.kt create mode 100644 opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt (limited to 'opendc-kernel-omega/src') diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt index 6ece73aa..e0d70eed 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt @@ -24,11 +24,9 @@ package com.atlarge.opendc.omega -import com.atlarge.opendc.simulator.* +import com.atlarge.opendc.simulator.Bootstrap +import com.atlarge.opendc.simulator.kernel.Simulation import com.atlarge.opendc.simulator.kernel.Kernel -import mu.KotlinLogging -import java.util.* -import kotlin.coroutines.experimental.* /** * The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core. @@ -36,296 +34,14 @@ import kotlin.coroutines.experimental.* * This simulator implementation is a single-threaded implementation, running simulation kernels synchronously and * provides a single priority queue for all events (messages, ticks, etc) that occur in the entities. * - * @property model The model that is simulated. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -internal class OmegaKernel(bootstrap: Bootstrap) : Kernel, Bootstrap.Context { +object OmegaKernel : Kernel { /** - * The logger instance to use for the simulator. + * Create a simulation over the given model facilitated by this simulation kernel. + * + * @param bootstrap The apply procedure to apply the simulation with. + * @return A [Simulation] instance to control the simulation. */ - private val logger = KotlinLogging.logger {} - - /** - * The registry of the simulation kernels used in the experiment. - */ - private val registry: MutableMap, OmegaContext<*>> = HashMap() - - /** - * The message queue. - */ - private val queue: Queue = PriorityQueue(Comparator.comparingLong(MessageContainer::time)) - - /** - * The processes to be spawned. - */ - private val spawnings: Queue> = ArrayDeque() - - /** - * The simulation time. - */ - override var time: Instant = 0 - - /** - * The model of simulation. - */ - override val model: M = bootstrap.bootstrap(this) - - override val , S> E.state: S - get() = context?.state ?: initialState - - /** - * The context associated with an [Entity]. - */ - @Suppress("UNCHECKED_CAST") - private val , S, M> E.context: OmegaContext? - get() = registry[this] as? OmegaContext - - override fun register(entity: Entity<*, M>): Boolean { - if (!registry.containsKey(entity) && entity !is Process) { - return false - } - - val process = entity as Process<*, M> - spawnings.add(process) - return true - } - - override fun deregister(entity: Entity<*, M>): Boolean { - val context = entity.context ?: return false - context.resume(Unit) - return true - } - - override fun schedule(message: Any, destination: Entity<*, *>, sender: Entity<*, *>?, delay: Duration) = - schedule(prepare(message, destination, sender, delay)) - - override fun step() { - while (true) { - // Initialise all spawned processes - while (spawnings.isNotEmpty()) { - val process = spawnings.poll() - val context = OmegaContext(process).also { registry[process] = it } - - // Bootstrap the process coroutine - val block: suspend () -> Unit = { context.start() } - block.startCoroutine(context) - - } - - val envelope = queue.peek() ?: return - val delivery = envelope.time - - if (delivery > time) { - // Tick has yet to occur - // Jump in time to next event - time = delivery - break - } else if (delivery < time) { - // Tick has already occurred - logger.warn { "Message processed out of order" } - } - - queue.poll() - - // If the sender has canceled the message, we move on to the next message - if (envelope.canceled) { - continue - } - - val context = envelope.destination.context ?: continue - - if (envelope.message !is Interrupt) { - context.continuation.resume(envelope) - } else { - context.continuation.resumeWithException(envelope.message) - } - - context.last = time - } - } - - - override fun run() { - while (queue.isNotEmpty() || spawnings.isNotEmpty()) { - step() - } - } - - override fun run(until: Instant) { - require(until > 0) { "The given instant must be a non-zero positive number" } - - if (time >= until) { - return - } - - while (time < until && (queue.isNotEmpty() || spawnings.isNotEmpty())) { - step() - } - - // Fix clock if step() jumped too far in time to give the impression to the user that simulation stopped at - // exactly the tick it gave. This has not effect on the actual simulation results as the next call to run() will - // just jump forward again. - if (time > until) { - time = until - } - } - - private fun schedule(envelope: MessageContainer) { - queue.add(envelope) - } - - private fun prepare(message: Any, destination: Entity<*, *>, sender: Entity<*, *>? = null, - delay: Duration): MessageContainer { - require(delay >= 0) { "The amount of time to delay the message must be a positive number" } - return MessageContainer(message, time + delay, sender, destination) - } - - /** - * This internal class provides the default implementation for the [Context] interface for this simulator. - */ - private inner class OmegaContext(val process: Process) : Context, Continuation { - /** - * The continuation to resume the execution of the process. - */ - lateinit var continuation: Continuation> - - /** - * The last point in time the process has done some work. - */ - var last: Instant = -1 - - /** - * The model in which the process exists. - */ - override val model: M - get() = this@OmegaKernel.model - - /** - * The state of the entity. - */ - override var state: S = process.initialState - - /** - * The current point in simulation time. - */ - override val time: Instant - get() = this@OmegaKernel.time - - /** - * The duration between the current point in simulation time and the last point in simulation time where the - * [Context] has executed some work. - */ - override val delta: Duration - get() = maxOf(time - last, 0) - - /** - * The [CoroutineContext] for a [Context]. - */ - override val context: CoroutineContext = EmptyCoroutineContext - - /** - * The observable state of an [Entity] within the simulation is provided by the context of the simulation. - */ - override val , S> T.state: S - get() = context?.state ?: initialState - - /** - * Start the process associated with this context. - */ - internal suspend fun start() = process.run { - run() - } - - /** - * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the - * message has been received. - * - * @return The envelope containing the message. - */ - suspend fun receiveEnvelope(): Envelope<*> = suspendCoroutine { continuation = it } - - override suspend fun receive(transform: suspend Envelope<*>.(Any) -> T): T { - val envelope = receiveEnvelope() - return transform(envelope, envelope.message) - } - - - override suspend fun receive(timeout: Duration, transform: suspend Envelope<*>.(Any) -> T): T? { - val send = prepare(Timeout, process, process, timeout).also { schedule(it) } - - try { - val received = receiveEnvelope() - - if (received.message == Timeout) { - send.canceled = true - return transform(received, received.message) - } - - return null - } finally { - send.canceled = true - } - } - - override suspend fun Entity<*, *>.send(msg: Any, delay: Duration) = send(msg, process, delay) - - override suspend fun Entity<*, *>.send(msg: Any, sender: Entity<*, *>, delay: Duration) = - schedule(prepare(msg, this, sender, delay)) - - override suspend fun Entity<*, *>.interrupt() = send(Interrupt) - - override suspend fun hold(duration: Duration) { - require(duration >= 0) { "The amount of time to hold must be a positive number" } - val envelope = prepare(Resume, process, process, duration).also { schedule(it) } - - try { - while (true) { - if (receive() == Resume) - return - } - } finally { - envelope.canceled = true - } - } - - override suspend fun hold(duration: Duration, queue: Queue) { - require(duration >= 0) { "The amount of time to hold must be a positive number" } - val envelope = prepare(Resume, process, process, duration).also { schedule(it) } - - try { - while (true) { - val msg = receive() - if (msg == Resume) - return - queue.add(msg) - } - } finally { - envelope.canceled = true - } - } - - - // Completion continuation implementation - /** - * Resume the execution of this continuation with the given value. - * - * @param value The value to resume with. - */ - override fun resume(value: Unit) { - // Deregister process from registry in order to have the GC collect this context - registry.remove(process) - } - - /** - * Resume the execution of this continuation with an exception. - * - * @param exception The exception to resume with. - */ - override fun resumeWithException(exception: Throwable) { - // Deregister process from registry in order to have the GC collect this context:w - registry.remove(process) - - logger.error(exception) { "An exception occurred during the execution of a process" } - } - } + override fun create(bootstrap: Bootstrap): Simulation = OmegaSimulation(bootstrap) } diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernelFactory.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernelFactory.kt deleted file mode 100644 index 139cbd19..00000000 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernelFactory.kt +++ /dev/null @@ -1,47 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2017 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.omega - -import com.atlarge.opendc.simulator.Bootstrap -import com.atlarge.opendc.simulator.kernel.Kernel -import com.atlarge.opendc.simulator.kernel.KernelFactory - -/** - * The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core. - * - * This simulator implementation is a single-threaded implementation, running simulation kernels synchronously and - * provides a single priority queue for all events (messages, ticks, etc) that occur in the entities. - * - * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) - */ -object OmegaKernelFactory : KernelFactory { - /** - * Create a simulation over the given model facilitated by this simulation kernel. - * - * @param bootstrap The bootstrap procedure to bootstrap the simulation with. - * @return A [Kernel] instance to control the simulation. - */ - override fun create(bootstrap: Bootstrap): Kernel = OmegaKernel(bootstrap) -} diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt new file mode 100644 index 00000000..80ac4600 --- /dev/null +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -0,0 +1,331 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.omega + +import com.atlarge.opendc.simulator.* +import com.atlarge.opendc.simulator.kernel.Simulation +import mu.KotlinLogging +import java.util.* +import kotlin.coroutines.experimental.* + +/** + * The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core. + * + * This simulator implementation is a single-threaded implementation, running simulation kernels synchronously and + * provides a single priority queue for all events (messages, ticks, etc) that occur in the entities. + * + * @property model The model that is simulated. + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Bootstrap.Context { + /** + * The logger instance to use for the simulator. + */ + private val logger = KotlinLogging.logger {} + + /** + * The registry of the simulation kernels used in the experiment. + */ + private val registry: MutableMap, OmegaContext<*>> = HashMap() + + /** + * The message queue. + */ + private val queue: Queue = PriorityQueue(Comparator.comparingLong(MessageContainer::time)) + + /** + * The processes to be spawned. + */ + private val spawnings: Queue> = ArrayDeque() + + /** + * The simulation time. + */ + override var time: Instant = 0 + + /** + * The model of simulation. + */ + override val model: M = bootstrap.apply(this) + + override val , S> E.state: S + get() = context?.state ?: initialState + + /** + * The context associated with an [Entity]. + */ + @Suppress("UNCHECKED_CAST") + private val , S, M> E.context: OmegaContext? + get() = registry[this] as? OmegaContext + + override fun register(entity: Entity<*, M>): Boolean { + if (!registry.containsKey(entity) && entity !is Process) { + return false + } + + val process = entity as Process<*, M> + spawnings.add(process) + return true + } + + override fun deregister(entity: Entity<*, M>): Boolean { + val context = entity.context ?: return false + context.resume(Unit) + return true + } + + override fun schedule(message: Any, destination: Entity<*, *>, sender: Entity<*, *>?, delay: Duration) = + schedule(prepare(message, destination, sender, delay)) + + override fun step() { + while (true) { + // Initialise all spawned processes + while (spawnings.isNotEmpty()) { + val process = spawnings.poll() + val context = OmegaContext(process).also { registry[process] = it } + + // Bootstrap the process coroutine + val block: suspend () -> Unit = { context.start() } + block.startCoroutine(context) + + } + + val envelope = queue.peek() ?: return + val delivery = envelope.time + + if (delivery > time) { + // Tick has yet to occur + // Jump in time to next event + time = delivery + break + } else if (delivery < time) { + // Tick has already occurred + logger.warn { "Message processed out of order" } + } + + queue.poll() + + // If the sender has canceled the message, we move on to the next message + if (envelope.canceled) { + continue + } + + val context = envelope.destination.context ?: continue + + if (envelope.message !is Interrupt) { + context.continuation.resume(envelope) + } else { + context.continuation.resumeWithException(envelope.message) + } + + context.last = time + } + } + + + override fun run() { + while (queue.isNotEmpty() || spawnings.isNotEmpty()) { + step() + } + } + + override fun run(until: Instant) { + require(until > 0) { "The given instant must be a non-zero positive number" } + + if (time >= until) { + return + } + + while (time < until && (queue.isNotEmpty() || spawnings.isNotEmpty())) { + step() + } + + // Fix clock if step() jumped too far in time to give the impression to the user that simulation stopped at + // exactly the tick it gave. This has not effect on the actual simulation results as the next call to run() will + // just jump forward again. + if (time > until) { + time = until + } + } + + private fun schedule(envelope: MessageContainer) { + queue.add(envelope) + } + + private fun prepare(message: Any, destination: Entity<*, *>, sender: Entity<*, *>? = null, + delay: Duration): MessageContainer { + require(delay >= 0) { "The amount of time to delay the message must be a positive number" } + return MessageContainer(message, time + delay, sender, destination) + } + + /** + * This internal class provides the default implementation for the [Context] interface for this simulator. + */ + private inner class OmegaContext(val process: Process) : Context, Continuation { + /** + * The continuation to resume the execution of the process. + */ + lateinit var continuation: Continuation> + + /** + * The last point in time the process has done some work. + */ + var last: Instant = -1 + + /** + * The model in which the process exists. + */ + override val model: M + get() = this@OmegaSimulation.model + + /** + * The state of the entity. + */ + override var state: S = process.initialState + + /** + * The current point in simulation time. + */ + override val time: Instant + get() = this@OmegaSimulation.time + + /** + * The duration between the current point in simulation time and the last point in simulation time where the + * [Context] has executed some work. + */ + override val delta: Duration + get() = maxOf(time - last, 0) + + /** + * The [CoroutineContext] for a [Context]. + */ + override val context: CoroutineContext = EmptyCoroutineContext + + /** + * The observable state of an [Entity] within the simulation is provided by the context of the simulation. + */ + override val , S> T.state: S + get() = context?.state ?: initialState + + /** + * Start the process associated with this context. + */ + internal suspend fun start() = process.run { + run() + } + + /** + * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the + * message has been received. + * + * @return The envelope containing the message. + */ + suspend fun receiveEnvelope(): Envelope<*> = suspendCoroutine { continuation = it } + + override suspend fun receive(transform: suspend Envelope<*>.(Any) -> T): T { + val envelope = receiveEnvelope() + return transform(envelope, envelope.message) + } + + + override suspend fun receive(timeout: Duration, transform: suspend Envelope<*>.(Any) -> T): T? { + val send = prepare(Timeout, process, process, timeout).also { schedule(it) } + + try { + val received = receiveEnvelope() + + if (received.message == Timeout) { + send.canceled = true + return transform(received, received.message) + } + + return null + } finally { + send.canceled = true + } + } + + override suspend fun Entity<*, *>.send(msg: Any, delay: Duration) = send(msg, process, delay) + + override suspend fun Entity<*, *>.send(msg: Any, sender: Entity<*, *>, delay: Duration) = + schedule(prepare(msg, this, sender, delay)) + + override suspend fun Entity<*, *>.interrupt() = send(Interrupt) + + override suspend fun hold(duration: Duration) { + require(duration >= 0) { "The amount of time to hold must be a positive number" } + val envelope = prepare(Resume, process, process, duration).also { schedule(it) } + + try { + while (true) { + if (receive() == Resume) + return + } + } finally { + envelope.canceled = true + } + } + + override suspend fun hold(duration: Duration, queue: Queue) { + require(duration >= 0) { "The amount of time to hold must be a positive number" } + val envelope = prepare(Resume, process, process, duration).also { schedule(it) } + + try { + while (true) { + val msg = receive() + if (msg == Resume) + return + queue.add(msg) + } + } finally { + envelope.canceled = true + } + } + + + // Completion continuation implementation + /** + * Resume the execution of this continuation with the given value. + * + * @param value The value to resume with. + */ + override fun resume(value: Unit) { + // Deregister process from registry in order to have the GC collect this context + registry.remove(process) + } + + /** + * Resume the execution of this continuation with an exception. + * + * @param exception The exception to resume with. + */ + override fun resumeWithException(exception: Throwable) { + // Deregister process from registry in order to have the GC collect this context:w + registry.remove(process) + + logger.error(exception) { "An exception occurred during the execution of a process" } + } + } +} diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt index 48b4f02a..74fa686b 100644 --- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt +++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt @@ -66,8 +66,8 @@ internal class SmokeTest { } } } - val kernel = OmegaKernelFactory.create(bootstrap) - kernel.run() + val simulation = OmegaKernel.create(bootstrap) + simulation.run() } object NullProcess : Process { @@ -88,8 +88,8 @@ internal class SmokeTest { } } - val kernel = OmegaKernelFactory.create(bootstrap) - kernel.run() + val simulation = OmegaKernel.create(bootstrap) + simulation.run() } object CrashProcess : Process { @@ -112,8 +112,8 @@ internal class SmokeTest { } } - val kernel = OmegaKernelFactory.create(bootstrap) - kernel.run() + val simulation = OmegaKernel.create(bootstrap) + simulation.run() } class ModelProcess(private val value: Int) : Process { @@ -136,9 +136,9 @@ internal class SmokeTest { value } - val kernel = OmegaKernelFactory.create(bootstrap) - kernel.run(5) + val simulation = OmegaKernel.create(bootstrap) + simulation.run(5) - assertTrue(kernel.run { process.state }) + assertTrue(simulation.run { process.state }) } } -- cgit v1.2.3 From 99dc5592bbca0f7d3a9d411268608c112b135896 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 15 Feb 2018 16:18:55 +0100 Subject: refactor(#18): Handle process launch using a kernel process This change will make the kernel handle the launch of processes using a kernel process that is launched at the start of the simulation and launches processes when it receives `Launch` messages. --- .../com/atlarge/opendc/omega/MessageContainer.kt | 48 ------- .../kotlin/com/atlarge/opendc/omega/Messages.kt | 30 +++++ .../com/atlarge/opendc/omega/OmegaSimulation.kt | 142 ++++++++++++++------- .../main/kotlin/com/atlarge/opendc/omega/Resume.kt | 39 ------ .../kotlin/com/atlarge/opendc/omega/Timeout.kt | 35 ----- 5 files changed, 129 insertions(+), 165 deletions(-) delete mode 100644 opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/MessageContainer.kt create mode 100644 opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Messages.kt delete mode 100644 opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Resume.kt delete mode 100644 opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Timeout.kt (limited to 'opendc-kernel-omega/src') diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/MessageContainer.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/MessageContainer.kt deleted file mode 100644 index 32f27111..00000000 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/MessageContainer.kt +++ /dev/null @@ -1,48 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2017 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.omega - -import com.atlarge.opendc.simulator.Entity -import com.atlarge.opendc.simulator.Envelope -import com.atlarge.opendc.simulator.Instant - -/** - * A wrapper around a message that has been scheduled for processing. - * - * @property message The message to wrap. - * @property time The point in time to deliver the message. - * @property sender The sender of the message. - * @property destination The destination of the message. - * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) - */ -internal data class MessageContainer(override val message: Any, - val time: Instant, - override val sender: Entity<*, *>?, - override val destination: Entity<*, *>) : Envelope { - /** - * A flag to indicate the message has been canceled. - */ - internal var canceled: Boolean = false -} diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Messages.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Messages.kt new file mode 100644 index 00000000..73c3676f --- /dev/null +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Messages.kt @@ -0,0 +1,30 @@ +package com.atlarge.opendc.omega + +import com.atlarge.opendc.simulator.Context +import com.atlarge.opendc.simulator.Process + +/** + * An internal message used by the Omega simulation kernel to indicate to a suspended [Process], that it should wake up + * and resume execution. + * + * This message is not guaranteed to work on other simulation kernels and [Context.interrupt] should be preferred to + * wake up a process from another entity. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +object Resume + +/** + * An internal message used by the Omega simulation kernel to indicate to a suspended [Process], that a timeout has been + * reached and that it should wake up and resume execution. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +object Timeout + +/** + * An internal message used by the Omega simulation kernel to launch a process. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +data class Launch(val process: Process<*, M>) diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt index 80ac4600..71b20e34 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -46,7 +46,7 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot private val logger = KotlinLogging.logger {} /** - * The registry of the simulation kernels used in the experiment. + * The registry of the processes used in the simulation. */ private val registry: MutableMap, OmegaContext<*>> = HashMap() @@ -56,9 +56,29 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot private val queue: Queue = PriorityQueue(Comparator.comparingLong(MessageContainer::time)) /** - * The processes to be spawned. + * The kernel process instance that handles internal operations during the simulation. */ - private val spawnings: Queue> = ArrayDeque() + private val process = object : Process { + override val initialState = Unit + + override suspend fun Context.run() { + while(true) { + val msg = receive() + when (msg) { + is Launch<*> -> + @Suppress("UNCHECKED_CAST") + launch((msg as Launch).process) + } + } + } + } + + /** + * The context associated with an [Entity]. + */ + @Suppress("UNCHECKED_CAST") + private val , S, M> E.context: OmegaContext? + get() = registry[this] as? OmegaContext /** * The simulation time. @@ -68,25 +88,30 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot /** * The model of simulation. */ - override val model: M = bootstrap.apply(this) + // XXX: the bootstrap requires the properties of this class to be initialised, so changing the order may cause NPEs + override var model: M = bootstrap.apply(this) + /** + * The observable state of an [Entity] in simulation, which is provided by the simulation context. + */ override val , S> E.state: S get() = context?.state ?: initialState /** - * The context associated with an [Entity]. + * Initialise the simulation instance. */ - @Suppress("UNCHECKED_CAST") - private val , S, M> E.context: OmegaContext? - get() = registry[this] as? OmegaContext + init { + // Launch the Omega kernel process + launch(process) + } + // Bootstrap Context implementation override fun register(entity: Entity<*, M>): Boolean { if (!registry.containsKey(entity) && entity !is Process) { return false } - val process = entity as Process<*, M> - spawnings.add(process) + schedule(Launch(entity as Process<*, M>), process) return true } @@ -99,19 +124,9 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot override fun schedule(message: Any, destination: Entity<*, *>, sender: Entity<*, *>?, delay: Duration) = schedule(prepare(message, destination, sender, delay)) + // Simulation implementation override fun step() { while (true) { - // Initialise all spawned processes - while (spawnings.isNotEmpty()) { - val process = spawnings.poll() - val context = OmegaContext(process).also { registry[process] = it } - - // Bootstrap the process coroutine - val block: suspend () -> Unit = { context.start() } - block.startCoroutine(context) - - } - val envelope = queue.peek() ?: return val delivery = envelope.time @@ -144,9 +159,8 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot } } - override fun run() { - while (queue.isNotEmpty() || spawnings.isNotEmpty()) { + while (queue.isNotEmpty()) { step() } } @@ -158,7 +172,7 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot return } - while (time < until && (queue.isNotEmpty() || spawnings.isNotEmpty())) { + while (time < until && queue.isNotEmpty()) { step() } @@ -170,10 +184,42 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot } } + /** + * A wrapper around a message that has been scheduled for processing. + * + * @property message The message to wrap. + * @property time The point in time to deliver the message. + * @property sender The sender of the message. + * @property destination The destination of the message. + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ + private data class MessageContainer(override val message: Any, + val time: Instant, + override val sender: Entity<*, *>?, + override val destination: Entity<*, *>) : Envelope { + /** + * A flag to indicate the message has been canceled. + */ + internal var canceled: Boolean = false + } + + /** + * Schedule the given envelope to be processed by the kernel. + * + * @param envelope The envelope containing the message to schedule. + */ private fun schedule(envelope: MessageContainer) { queue.add(envelope) } + /** + * Prepare a message for scheduling by wrapping it into an envelope. + * + * @param message The message to send. + * @param destination The destination entity that should receive the message. + * @param sender The optional sender of the message. + * @param delay The time to delay the message. + */ private fun prepare(message: Any, destination: Entity<*, *>, sender: Entity<*, *>? = null, delay: Duration): MessageContainer { require(delay >= 0) { "The amount of time to delay the message must be a positive number" } @@ -181,19 +227,22 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot } /** - * This internal class provides the default implementation for the [Context] interface for this simulator. + * Launch the given [Process]. + * + * @param process The process to launch. */ - private inner class OmegaContext(val process: Process) : Context, Continuation { - /** - * The continuation to resume the execution of the process. - */ - lateinit var continuation: Continuation> + private fun launch(process: Process<*, M>) { + val context = OmegaContext(process).also { registry[process] = it } - /** - * The last point in time the process has done some work. - */ - var last: Instant = -1 + // Bootstrap the process coroutine + val block: suspend () -> Unit = { context.start() } + block.startCoroutine(context) + } + /** + * This internal class provides the default implementation for the [Context] interface for this simulator. + */ + private inner class OmegaContext(val process: Process) : Context, Continuation { /** * The model in which the process exists. */ @@ -230,19 +279,14 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot get() = context?.state ?: initialState /** - * Start the process associated with this context. + * The continuation to resume the execution of the process. */ - internal suspend fun start() = process.run { - run() - } + lateinit var continuation: Continuation> /** - * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the - * message has been received. - * - * @return The envelope containing the message. + * The last point in time the process has done some work. */ - suspend fun receiveEnvelope(): Envelope<*> = suspendCoroutine { continuation = it } + var last: Instant = -1 override suspend fun receive(transform: suspend Envelope<*>.(Any) -> T): T { val envelope = receiveEnvelope() @@ -304,6 +348,18 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot } } + /** + * Start the process associated with this context. + */ + internal suspend fun start() = process.run { run() } + + /** + * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the + * message has been received. + * + * @return The envelope containing the message. + */ + suspend fun receiveEnvelope(): Envelope<*> = suspendCoroutine { continuation = it } // Completion continuation implementation /** diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Resume.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Resume.kt deleted file mode 100644 index d4bd8536..00000000 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Resume.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2017 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.omega - -import com.atlarge.opendc.simulator.Context -import com.atlarge.opendc.simulator.Process - -/** - * An internal message used by the Omega simulation kernel to indicate to a suspended [Process], that it should wake up - * and resume execution. - * - * This message is not guaranteed to work on other simulation kernels and [Context.interrupt] should be preferred to - * wake up a process from another entity. - * - * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) - */ -object Resume diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Timeout.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Timeout.kt deleted file mode 100644 index c205f6b5..00000000 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Timeout.kt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2017 atlarge-research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.atlarge.opendc.omega - -import com.atlarge.opendc.simulator.Process - -/** - * An internal message used by the Omega simulation kernel to indicate to a suspended [Process], that a timeout has been - * reached and that it should wake up and resume execution. - * - * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) - */ -object Timeout -- cgit v1.2.3 From 2fa0134773a99394aae0efc167af6767e2828c71 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 16 Feb 2018 14:32:16 +0100 Subject: refactor(#18): Fix broken receive() timeout This change fixes the broken implementation of the `receive()` method with a timeout due to an invalid condition. --- .../src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'opendc-kernel-omega/src') diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt index 71b20e34..9c6d4ab3 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -300,7 +300,7 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot try { val received = receiveEnvelope() - if (received.message == Timeout) { + if (received.message != Timeout) { send.canceled = true return transform(received, received.message) } -- cgit v1.2.3 From b84a995a05fecb9ef90c9184959f285f324e7411 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 16 Feb 2018 14:39:53 +0100 Subject: refactor(#18): Provide access to latest sender This change adds a `sender` property to the `Context` interface to provide processes access to the sender of the latest received message. Please note that methods like `hold()` and `interrupt()` may change the value of this property. --- .../com/atlarge/opendc/omega/OmegaSimulation.kt | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) (limited to 'opendc-kernel-omega/src') diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt index 9c6d4ab3..bd3f4529 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -249,11 +249,6 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot override val model: M get() = this@OmegaSimulation.model - /** - * The state of the entity. - */ - override var state: S = process.initialState - /** * The current point in simulation time. */ @@ -268,9 +263,9 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot get() = maxOf(time - last, 0) /** - * The [CoroutineContext] for a [Context]. + * The state of the entity. */ - override val context: CoroutineContext = EmptyCoroutineContext + override var state: S = process.initialState /** * The observable state of an [Entity] within the simulation is provided by the context of the simulation. @@ -278,6 +273,16 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot override val , S> T.state: S get() = context?.state ?: initialState + /** + * The sender of the last received message or `null` in case the process has not received any messages yet. + */ + override var sender: Entity<*, *>? = null + + /** + * The [CoroutineContext] for a [Context]. + */ + override val context: CoroutineContext = EmptyCoroutineContext + /** * The continuation to resume the execution of the process. */ @@ -359,7 +364,8 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot * * @return The envelope containing the message. */ - suspend fun receiveEnvelope(): Envelope<*> = suspendCoroutine { continuation = it } + suspend fun receiveEnvelope() = suspendCoroutine> { continuation = it } + .also { sender = it.sender } // Completion continuation implementation /** -- cgit v1.2.3 From f8a4095d1824df095ea91253f914bc0512646684 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 16 Feb 2018 15:25:19 +0100 Subject: refactor(#18): Provide access to process context in nested calls This change provides a method in the standard library to access the process context in nested suspending function calls. --- .../src/main/kotlin/com/atlarge/opendc/omega/Messages.kt | 2 +- .../main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt | 13 ++++++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) (limited to 'opendc-kernel-omega/src') diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Messages.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Messages.kt index 73c3676f..d63a53c8 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Messages.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Messages.kt @@ -27,4 +27,4 @@ object Timeout * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -data class Launch(val process: Process<*, M>) +data class Launch(val process: Process<*, M>) diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt index bd3f4529..22382ccd 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -242,7 +242,8 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot /** * This internal class provides the default implementation for the [Context] interface for this simulator. */ - private inner class OmegaContext(val process: Process) : Context, Continuation { + private inner class OmegaContext(val process: Process) : Context, Continuation, + AbstractCoroutineContextElement(Context) { /** * The model in which the process exists. */ @@ -255,6 +256,12 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot override val time: Instant get() = this@OmegaSimulation.time + /** + * The [Entity] associated with this context. + */ + override val self: Entity + get() = process + /** * The duration between the current point in simulation time and the last point in simulation time where the * [Context] has executed some work. @@ -281,7 +288,7 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot /** * The [CoroutineContext] for a [Context]. */ - override val context: CoroutineContext = EmptyCoroutineContext + override val context: CoroutineContext = this /** * The continuation to resume the execution of the process. @@ -321,7 +328,7 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot override suspend fun Entity<*, *>.send(msg: Any, sender: Entity<*, *>, delay: Duration) = schedule(prepare(msg, this, sender, delay)) - override suspend fun Entity<*, *>.interrupt() = send(Interrupt) + override suspend fun Entity<*, *>.interrupt(interrupt: Interrupt) = send(interrupt) override suspend fun hold(duration: Duration) { require(duration >= 0) { "The amount of time to hold must be a positive number" } -- cgit v1.2.3 From 0f84a74536e404f707c4f4e370efbaad8d12b89a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sat, 17 Feb 2018 11:18:07 +0100 Subject: refactor(#18): Optimize imports --- .../src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'opendc-kernel-omega/src') diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt index e0d70eed..c0ab9fb4 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt @@ -25,8 +25,8 @@ package com.atlarge.opendc.omega import com.atlarge.opendc.simulator.Bootstrap -import com.atlarge.opendc.simulator.kernel.Simulation import com.atlarge.opendc.simulator.kernel.Kernel +import com.atlarge.opendc.simulator.kernel.Simulation /** * The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core. -- cgit v1.2.3 From ac0a8505ceb817e33011ff80f8585b199896b9c4 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 19 Feb 2018 14:15:55 +0100 Subject: bug: Make message priority queue stable This change fixes the bug where the insertion order into the message queue was not guaranteed for messages arriving at the same time, causing some non-deterministic behaviour. --- .../kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) (limited to 'opendc-kernel-omega/src') diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt index 22382ccd..8c1497c8 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -53,7 +53,9 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot /** * The message queue. */ - private val queue: Queue = PriorityQueue(Comparator.comparingLong(MessageContainer::time)) + private val queue: Queue = PriorityQueue(Comparator + .comparingLong(MessageContainer::time) + .thenComparingLong(MessageContainer::id)) /** * The kernel process instance that handles internal operations during the simulation. @@ -184,16 +186,23 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot } } + /** + * The identifier for the next message to be scheduled. + */ + private var nextId: Long = 0 + /** * A wrapper around a message that has been scheduled for processing. * + * @property id The identifier of the message to keep the priority queue stable * @property message The message to wrap. * @property time The point in time to deliver the message. * @property sender The sender of the message. * @property destination The destination of the message. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ - private data class MessageContainer(override val message: Any, + private data class MessageContainer(val id: Long, + override val message: Any, val time: Instant, override val sender: Entity<*, *>?, override val destination: Entity<*, *>) : Envelope { @@ -223,7 +232,7 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot private fun prepare(message: Any, destination: Entity<*, *>, sender: Entity<*, *>? = null, delay: Duration): MessageContainer { require(delay >= 0) { "The amount of time to delay the message must be a positive number" } - return MessageContainer(message, time + delay, sender, destination) + return MessageContainer(nextId++, message, time + delay, sender, destination) } /** -- cgit v1.2.3 From 59247a4f7a2dc948b3a63ff185c64922eb4334ea Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 19 Feb 2018 15:09:19 +0100 Subject: refactor(#18): Refactor unused transformation receive methods This change removes the unused transformation receive methods from the `Context` class as this functionality can now be easily implemented in the standard library using the newly introduced `sender` property. --- .../com/atlarge/opendc/omega/OmegaSimulation.kt | 37 ++++++++++------------ .../kotlin/com/atlarge/opendc/omega/SmokeTest.kt | 5 ++- 2 files changed, 18 insertions(+), 24 deletions(-) (limited to 'opendc-kernel-omega/src') diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt index 8c1497c8..4d94bf9e 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -53,9 +53,9 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot /** * The message queue. */ - private val queue: Queue = PriorityQueue(Comparator - .comparingLong(MessageContainer::time) - .thenComparingLong(MessageContainer::id)) + private val queue: Queue = PriorityQueue(Comparator + .comparingLong(Envelope::time) + .thenComparingLong(Envelope::id)) /** * The kernel process instance that handles internal operations during the simulation. @@ -199,13 +199,12 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot * @property time The point in time to deliver the message. * @property sender The sender of the message. * @property destination The destination of the message. - * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ - private data class MessageContainer(val id: Long, - override val message: Any, - val time: Instant, - override val sender: Entity<*, *>?, - override val destination: Entity<*, *>) : Envelope { + private data class Envelope(val id: Long, + val message: Any, + val time: Instant, + val sender: Entity<*, *>?, + val destination: Entity<*, *>) { /** * A flag to indicate the message has been canceled. */ @@ -217,7 +216,7 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot * * @param envelope The envelope containing the message to schedule. */ - private fun schedule(envelope: MessageContainer) { + private fun schedule(envelope: Envelope) { queue.add(envelope) } @@ -230,9 +229,9 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot * @param delay The time to delay the message. */ private fun prepare(message: Any, destination: Entity<*, *>, sender: Entity<*, *>? = null, - delay: Duration): MessageContainer { + delay: Duration): Envelope { require(delay >= 0) { "The amount of time to delay the message must be a positive number" } - return MessageContainer(nextId++, message, time + delay, sender, destination) + return Envelope(nextId++, message, time + delay, sender, destination) } /** @@ -302,20 +301,16 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot /** * The continuation to resume the execution of the process. */ - lateinit var continuation: Continuation> + lateinit var continuation: Continuation /** * The last point in time the process has done some work. */ var last: Instant = -1 - override suspend fun receive(transform: suspend Envelope<*>.(Any) -> T): T { - val envelope = receiveEnvelope() - return transform(envelope, envelope.message) - } - + override suspend fun receive(): Any = receiveEnvelope().message - override suspend fun receive(timeout: Duration, transform: suspend Envelope<*>.(Any) -> T): T? { + override suspend fun receive(timeout: Duration): Any? { val send = prepare(Timeout, process, process, timeout).also { schedule(it) } try { @@ -323,7 +318,7 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot if (received.message != Timeout) { send.canceled = true - return transform(received, received.message) + return received.message } return null @@ -380,7 +375,7 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot * * @return The envelope containing the message. */ - suspend fun receiveEnvelope() = suspendCoroutine> { continuation = it } + suspend fun receiveEnvelope() = suspendCoroutine { continuation = it } .also { sender = it.sender } // Completion continuation implementation diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt index 74fa686b..c47f9a26 100644 --- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt +++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt @@ -41,9 +41,8 @@ internal class SmokeTest { override val initialState = Unit override suspend fun Context.run() { while (true) { - receive { - sender?.send(message) - } + val msg = receive() + sender?.send(msg) } } } -- cgit v1.2.3 From 86dc826db4cd91b5a6875d9ecdd64c9238d7b95c Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 19 Feb 2018 15:12:16 +0100 Subject: refactor(#18): Simplify Context interface This change simplifies the `Context` interface to reduce the amount of methods required to implement by implementors. --- .../src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt | 2 -- 1 file changed, 2 deletions(-) (limited to 'opendc-kernel-omega/src') diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt index 4d94bf9e..532a033a 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -327,8 +327,6 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot } } - override suspend fun Entity<*, *>.send(msg: Any, delay: Duration) = send(msg, process, delay) - override suspend fun Entity<*, *>.send(msg: Any, sender: Entity<*, *>, delay: Duration) = schedule(prepare(msg, this, sender, delay)) -- cgit v1.2.3