summaryrefslogtreecommitdiff
path: root/opendc-kernel-omega/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2018-02-23 12:17:48 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2018-02-23 12:17:48 +0100
commitf691a72b12a43fa15c1617966450c55206664797 (patch)
treee76afd3b1d5673a29d71eedb9d373396976d84bd /opendc-kernel-omega/src
parent8666a78b86a40c1d8dab28dd18e841318c01f97f (diff)
parent86dc826db4cd91b5a6875d9ecdd64c9238d7b95c (diff)
refactor(#18): Redesign core simulation API
This change contains the redesign of the core simulation API and provides a cleaner interface for developing simulation models for the users. Closes #18
Diffstat (limited to 'opendc-kernel-omega/src')
-rw-r--r--opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Messages.kt30
-rw-r--r--opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt47
-rw-r--r--opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt402
-rw-r--r--opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt143
4 files changed, 622 insertions, 0 deletions
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Messages.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Messages.kt
new file mode 100644
index 00000000..d63a53c8
--- /dev/null
+++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Messages.kt
@@ -0,0 +1,30 @@
+package com.atlarge.opendc.omega
+
+import com.atlarge.opendc.simulator.Context
+import com.atlarge.opendc.simulator.Process
+
+/**
+ * An internal message used by the Omega simulation kernel to indicate to a suspended [Process], that it should wake up
+ * and resume execution.
+ *
+ * This message is not guaranteed to work on other simulation kernels and [Context.interrupt] should be preferred to
+ * wake up a process from another entity.
+ *
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+object Resume
+
+/**
+ * An internal message used by the Omega simulation kernel to indicate to a suspended [Process], that a timeout has been
+ * reached and that it should wake up and resume execution.
+ *
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+object Timeout
+
+/**
+ * An internal message used by the Omega simulation kernel to launch a process.
+ *
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+data class Launch<M>(val process: Process<*, M>)
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt
new file mode 100644
index 00000000..c0ab9fb4
--- /dev/null
+++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt
@@ -0,0 +1,47 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2017 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.omega
+
+import com.atlarge.opendc.simulator.Bootstrap
+import com.atlarge.opendc.simulator.kernel.Kernel
+import com.atlarge.opendc.simulator.kernel.Simulation
+
+/**
+ * The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core.
+ *
+ * This simulator implementation is a single-threaded implementation, running simulation kernels synchronously and
+ * provides a single priority queue for all events (messages, ticks, etc) that occur in the entities.
+ *
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+object OmegaKernel : Kernel {
+ /**
+ * Create a simulation over the given model facilitated by this simulation kernel.
+ *
+ * @param bootstrap The apply procedure to apply the simulation with.
+ * @return A [Simulation] instance to control the simulation.
+ */
+ override fun <M> create(bootstrap: Bootstrap<M>): Simulation<M> = OmegaSimulation(bootstrap)
+}
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
new file mode 100644
index 00000000..532a033a
--- /dev/null
+++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
@@ -0,0 +1,402 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2017 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.omega
+
+import com.atlarge.opendc.simulator.*
+import com.atlarge.opendc.simulator.kernel.Simulation
+import mu.KotlinLogging
+import java.util.*
+import kotlin.coroutines.experimental.*
+
+/**
+ * The Omega simulation kernel is the reference simulation kernel implementation for the OpenDC Simulator core.
+ *
+ * This simulator implementation is a single-threaded implementation, running simulation kernels synchronously and
+ * provides a single priority queue for all events (messages, ticks, etc) that occur in the entities.
+ *
+ * @property model The model that is simulated.
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Bootstrap.Context<M> {
+ /**
+ * The logger instance to use for the simulator.
+ */
+ private val logger = KotlinLogging.logger {}
+
+ /**
+ * The registry of the processes used in the simulation.
+ */
+ private val registry: MutableMap<Entity<*, *>, OmegaContext<*>> = HashMap()
+
+ /**
+ * The message queue.
+ */
+ private val queue: Queue<Envelope> = PriorityQueue(Comparator
+ .comparingLong(Envelope::time)
+ .thenComparingLong(Envelope::id))
+
+ /**
+ * The kernel process instance that handles internal operations during the simulation.
+ */
+ private val process = object : Process<Unit, M> {
+ override val initialState = Unit
+
+ override suspend fun Context<Unit, M>.run() {
+ while(true) {
+ val msg = receive()
+ when (msg) {
+ is Launch<*> ->
+ @Suppress("UNCHECKED_CAST")
+ launch((msg as Launch<M>).process)
+ }
+ }
+ }
+ }
+
+ /**
+ * The context associated with an [Entity].
+ */
+ @Suppress("UNCHECKED_CAST")
+ private val <E : Entity<S, M>, S, M> E.context: OmegaContext<S>?
+ get() = registry[this] as? OmegaContext<S>
+
+ /**
+ * The simulation time.
+ */
+ override var time: Instant = 0
+
+ /**
+ * The model of simulation.
+ */
+ // XXX: the bootstrap requires the properties of this class to be initialised, so changing the order may cause NPEs
+ override var model: M = bootstrap.apply(this)
+
+ /**
+ * The observable state of an [Entity] in simulation, which is provided by the simulation context.
+ */
+ override val <E : Entity<S, *>, S> E.state: S
+ get() = context?.state ?: initialState
+
+ /**
+ * Initialise the simulation instance.
+ */
+ init {
+ // Launch the Omega kernel process
+ launch(process)
+ }
+
+ // Bootstrap Context implementation
+ override fun register(entity: Entity<*, M>): Boolean {
+ if (!registry.containsKey(entity) && entity !is Process) {
+ return false
+ }
+
+ schedule(Launch(entity as Process<*, M>), process)
+ return true
+ }
+
+ override fun deregister(entity: Entity<*, M>): Boolean {
+ val context = entity.context ?: return false
+ context.resume(Unit)
+ return true
+ }
+
+ override fun schedule(message: Any, destination: Entity<*, *>, sender: Entity<*, *>?, delay: Duration) =
+ schedule(prepare(message, destination, sender, delay))
+
+ // Simulation implementation
+ override fun step() {
+ while (true) {
+ val envelope = queue.peek() ?: return
+ val delivery = envelope.time
+
+ if (delivery > time) {
+ // Tick has yet to occur
+ // Jump in time to next event
+ time = delivery
+ break
+ } else if (delivery < time) {
+ // Tick has already occurred
+ logger.warn { "Message processed out of order" }
+ }
+
+ queue.poll()
+
+ // If the sender has canceled the message, we move on to the next message
+ if (envelope.canceled) {
+ continue
+ }
+
+ val context = envelope.destination.context ?: continue
+
+ if (envelope.message !is Interrupt) {
+ context.continuation.resume(envelope)
+ } else {
+ context.continuation.resumeWithException(envelope.message)
+ }
+
+ context.last = time
+ }
+ }
+
+ override fun run() {
+ while (queue.isNotEmpty()) {
+ step()
+ }
+ }
+
+ override fun run(until: Instant) {
+ require(until > 0) { "The given instant must be a non-zero positive number" }
+
+ if (time >= until) {
+ return
+ }
+
+ while (time < until && queue.isNotEmpty()) {
+ step()
+ }
+
+ // Fix clock if step() jumped too far in time to give the impression to the user that simulation stopped at
+ // exactly the tick it gave. This has not effect on the actual simulation results as the next call to run() will
+ // just jump forward again.
+ if (time > until) {
+ time = until
+ }
+ }
+
+ /**
+ * The identifier for the next message to be scheduled.
+ */
+ private var nextId: Long = 0
+
+ /**
+ * A wrapper around a message that has been scheduled for processing.
+ *
+ * @property id The identifier of the message to keep the priority queue stable
+ * @property message The message to wrap.
+ * @property time The point in time to deliver the message.
+ * @property sender The sender of the message.
+ * @property destination The destination of the message.
+ */
+ private data class Envelope(val id: Long,
+ val message: Any,
+ val time: Instant,
+ val sender: Entity<*, *>?,
+ val destination: Entity<*, *>) {
+ /**
+ * A flag to indicate the message has been canceled.
+ */
+ internal var canceled: Boolean = false
+ }
+
+ /**
+ * Schedule the given envelope to be processed by the kernel.
+ *
+ * @param envelope The envelope containing the message to schedule.
+ */
+ private fun schedule(envelope: Envelope) {
+ queue.add(envelope)
+ }
+
+ /**
+ * Prepare a message for scheduling by wrapping it into an envelope.
+ *
+ * @param message The message to send.
+ * @param destination The destination entity that should receive the message.
+ * @param sender The optional sender of the message.
+ * @param delay The time to delay the message.
+ */
+ private fun prepare(message: Any, destination: Entity<*, *>, sender: Entity<*, *>? = null,
+ delay: Duration): Envelope {
+ require(delay >= 0) { "The amount of time to delay the message must be a positive number" }
+ return Envelope(nextId++, message, time + delay, sender, destination)
+ }
+
+ /**
+ * Launch the given [Process].
+ *
+ * @param process The process to launch.
+ */
+ private fun launch(process: Process<*, M>) {
+ val context = OmegaContext(process).also { registry[process] = it }
+
+ // Bootstrap the process coroutine
+ val block: suspend () -> Unit = { context.start() }
+ block.startCoroutine(context)
+ }
+
+ /**
+ * This internal class provides the default implementation for the [Context] interface for this simulator.
+ */
+ private inner class OmegaContext<S>(val process: Process<S, M>) : Context<S, M>, Continuation<Unit>,
+ AbstractCoroutineContextElement(Context) {
+ /**
+ * The model in which the process exists.
+ */
+ override val model: M
+ get() = this@OmegaSimulation.model
+
+ /**
+ * The current point in simulation time.
+ */
+ override val time: Instant
+ get() = this@OmegaSimulation.time
+
+ /**
+ * The [Entity] associated with this context.
+ */
+ override val self: Entity<S, M>
+ get() = process
+
+ /**
+ * The duration between the current point in simulation time and the last point in simulation time where the
+ * [Context] has executed some work.
+ */
+ override val delta: Duration
+ get() = maxOf(time - last, 0)
+
+ /**
+ * The state of the entity.
+ */
+ override var state: S = process.initialState
+
+ /**
+ * The observable state of an [Entity] within the simulation is provided by the context of the simulation.
+ */
+ override val <T : Entity<S, *>, S> T.state: S
+ get() = context?.state ?: initialState
+
+ /**
+ * The sender of the last received message or `null` in case the process has not received any messages yet.
+ */
+ override var sender: Entity<*, *>? = null
+
+ /**
+ * The [CoroutineContext] for a [Context].
+ */
+ override val context: CoroutineContext = this
+
+ /**
+ * The continuation to resume the execution of the process.
+ */
+ lateinit var continuation: Continuation<Envelope>
+
+ /**
+ * The last point in time the process has done some work.
+ */
+ var last: Instant = -1
+
+ override suspend fun receive(): Any = receiveEnvelope().message
+
+ override suspend fun receive(timeout: Duration): Any? {
+ val send = prepare(Timeout, process, process, timeout).also { schedule(it) }
+
+ try {
+ val received = receiveEnvelope()
+
+ if (received.message != Timeout) {
+ send.canceled = true
+ return received.message
+ }
+
+ return null
+ } finally {
+ send.canceled = true
+ }
+ }
+
+ override suspend fun Entity<*, *>.send(msg: Any, sender: Entity<*, *>, delay: Duration) =
+ schedule(prepare(msg, this, sender, delay))
+
+ override suspend fun Entity<*, *>.interrupt(interrupt: Interrupt) = send(interrupt)
+
+ override suspend fun hold(duration: Duration) {
+ require(duration >= 0) { "The amount of time to hold must be a positive number" }
+ val envelope = prepare(Resume, process, process, duration).also { schedule(it) }
+
+ try {
+ while (true) {
+ if (receive() == Resume)
+ return
+ }
+ } finally {
+ envelope.canceled = true
+ }
+ }
+
+ override suspend fun hold(duration: Duration, queue: Queue<Any>) {
+ require(duration >= 0) { "The amount of time to hold must be a positive number" }
+ val envelope = prepare(Resume, process, process, duration).also { schedule(it) }
+
+ try {
+ while (true) {
+ val msg = receive()
+ if (msg == Resume)
+ return
+ queue.add(msg)
+ }
+ } finally {
+ envelope.canceled = true
+ }
+ }
+
+ /**
+ * Start the process associated with this context.
+ */
+ internal suspend fun start() = process.run { run() }
+
+ /**
+ * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the
+ * message has been received.
+ *
+ * @return The envelope containing the message.
+ */
+ suspend fun receiveEnvelope() = suspendCoroutine<Envelope> { continuation = it }
+ .also { sender = it.sender }
+
+ // Completion continuation implementation
+ /**
+ * Resume the execution of this continuation with the given value.
+ *
+ * @param value The value to resume with.
+ */
+ override fun resume(value: Unit) {
+ // Deregister process from registry in order to have the GC collect this context
+ registry.remove(process)
+ }
+
+ /**
+ * Resume the execution of this continuation with an exception.
+ *
+ * @param exception The exception to resume with.
+ */
+ override fun resumeWithException(exception: Throwable) {
+ // Deregister process from registry in order to have the GC collect this context:w
+ registry.remove(process)
+
+ logger.error(exception) { "An exception occurred during the execution of a process" }
+ }
+ }
+}
diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
new file mode 100644
index 00000000..c47f9a26
--- /dev/null
+++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
@@ -0,0 +1,143 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2017 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.omega
+
+import com.atlarge.opendc.simulator.Bootstrap
+import com.atlarge.opendc.simulator.Context
+import com.atlarge.opendc.simulator.Process
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.api.Test
+
+/**
+ * This test suite checks for smoke when running a large amount of simulations.
+ *
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+internal class SmokeTest {
+ class EchoProcess : Process<Unit, Unit> {
+ override val initialState = Unit
+ override suspend fun Context<Unit, Unit>.run() {
+ while (true) {
+ val msg = receive()
+ sender?.send(msg)
+ }
+ }
+ }
+
+ /**
+ * Run a large amount of simulations and test if any exceptions occur.
+ */
+ @Test
+ fun smoke() {
+ val n = 1000
+ val messages = 100
+ val bootstrap: Bootstrap<Unit> = Bootstrap.create { ctx ->
+ repeat(n) {
+ EchoProcess().also {
+ ctx.register(it)
+
+ for (i in 1 until messages) {
+ ctx.schedule(i, it, delay = i.toLong())
+ }
+ }
+ }
+ }
+ val simulation = OmegaKernel.create(bootstrap)
+ simulation.run()
+ }
+
+ object NullProcess : Process<Unit, Unit> {
+ override val initialState = Unit
+ override suspend fun Context<Unit, Unit>.run() {}
+ }
+
+ /**
+ * Test if the kernel allows sending messages to [Context] instances that have already stopped.
+ */
+ @Test
+ fun `sending message to process that has gracefully stopped`() {
+ val process = NullProcess
+ val bootstrap: Bootstrap<Unit> = Bootstrap.create { ctx ->
+ process.also {
+ ctx.register(it)
+ ctx.schedule(0, it)
+ }
+ }
+
+ val simulation = OmegaKernel.create(bootstrap)
+ simulation.run()
+ }
+
+ object CrashProcess : Process<Unit, Unit> {
+ override val initialState = Unit
+ override suspend fun Context<Unit, Unit>.run() {
+ TODO("This process should crash")
+ }
+ }
+
+ /**
+ * Test if the kernel allows sending messages to [Context] instances that have crashed.
+ */
+ @Test
+ fun `sending message to process that has crashed`() {
+ val process = CrashProcess
+ val bootstrap: Bootstrap<Unit> = Bootstrap.create { ctx ->
+ process.also {
+ ctx.register(it)
+ ctx.schedule(0, it)
+ }
+ }
+
+ val simulation = OmegaKernel.create(bootstrap)
+ simulation.run()
+ }
+
+ class ModelProcess(private val value: Int) : Process<Boolean, Int> {
+ override val initialState = false
+ override suspend fun Context<Boolean, Int>.run() {
+ assertEquals(value, model)
+ state = true
+ hold(10)
+ }
+ }
+ /**
+ * Test if the kernel allows access to the simulation model object.
+ */
+ @Test
+ fun `access simulation model`() {
+ val value = 1
+ val process = ModelProcess(value)
+ val bootstrap: Bootstrap<Int> = Bootstrap.create { ctx ->
+ ctx.register(process)
+ value
+ }
+
+ val simulation = OmegaKernel.create(bootstrap)
+ simulation.run(5)
+
+ assertTrue(simulation.run { process.state })
+ }
+}