summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2018-02-15 16:18:55 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2018-02-15 16:18:55 +0100
commit99dc5592bbca0f7d3a9d411268608c112b135896 (patch)
treee92cf0012b6dfab09951f69f1f672ae589d02a04
parent157d30beb52c75831e29a1a22c199b95d6d30b42 (diff)
refactor(#18): Handle process launch using a kernel process
This change will make the kernel handle the launch of processes using a kernel process that is launched at the start of the simulation and launches processes when it receives `Launch` messages.
-rw-r--r--opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/MessageContainer.kt48
-rw-r--r--opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Messages.kt30
-rw-r--r--opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt142
-rw-r--r--opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Resume.kt39
-rw-r--r--opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Timeout.kt35
5 files changed, 129 insertions, 165 deletions
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/MessageContainer.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/MessageContainer.kt
deleted file mode 100644
index 32f27111..00000000
--- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/MessageContainer.kt
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2017 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.omega
-
-import com.atlarge.opendc.simulator.Entity
-import com.atlarge.opendc.simulator.Envelope
-import com.atlarge.opendc.simulator.Instant
-
-/**
- * A wrapper around a message that has been scheduled for processing.
- *
- * @property message The message to wrap.
- * @property time The point in time to deliver the message.
- * @property sender The sender of the message.
- * @property destination The destination of the message.
- * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
- */
-internal data class MessageContainer(override val message: Any,
- val time: Instant,
- override val sender: Entity<*, *>?,
- override val destination: Entity<*, *>) : Envelope<Any> {
- /**
- * A flag to indicate the message has been canceled.
- */
- internal var canceled: Boolean = false
-}
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Messages.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Messages.kt
new file mode 100644
index 00000000..73c3676f
--- /dev/null
+++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Messages.kt
@@ -0,0 +1,30 @@
+package com.atlarge.opendc.omega
+
+import com.atlarge.opendc.simulator.Context
+import com.atlarge.opendc.simulator.Process
+
+/**
+ * An internal message used by the Omega simulation kernel to indicate to a suspended [Process], that it should wake up
+ * and resume execution.
+ *
+ * This message is not guaranteed to work on other simulation kernels and [Context.interrupt] should be preferred to
+ * wake up a process from another entity.
+ *
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+object Resume
+
+/**
+ * An internal message used by the Omega simulation kernel to indicate to a suspended [Process], that a timeout has been
+ * reached and that it should wake up and resume execution.
+ *
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+object Timeout
+
+/**
+ * An internal message used by the Omega simulation kernel to launch a process.
+ *
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+data class Launch<in M>(val process: Process<*, M>)
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
index 80ac4600..71b20e34 100644
--- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
+++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
@@ -46,7 +46,7 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot
private val logger = KotlinLogging.logger {}
/**
- * The registry of the simulation kernels used in the experiment.
+ * The registry of the processes used in the simulation.
*/
private val registry: MutableMap<Entity<*, *>, OmegaContext<*>> = HashMap()
@@ -56,9 +56,29 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot
private val queue: Queue<MessageContainer> = PriorityQueue(Comparator.comparingLong(MessageContainer::time))
/**
- * The processes to be spawned.
+ * The kernel process instance that handles internal operations during the simulation.
*/
- private val spawnings: Queue<Process<*, M>> = ArrayDeque()
+ private val process = object : Process<Unit, M> {
+ override val initialState = Unit
+
+ override suspend fun Context<Unit, M>.run() {
+ while(true) {
+ val msg = receive()
+ when (msg) {
+ is Launch<*> ->
+ @Suppress("UNCHECKED_CAST")
+ launch((msg as Launch<M>).process)
+ }
+ }
+ }
+ }
+
+ /**
+ * The context associated with an [Entity].
+ */
+ @Suppress("UNCHECKED_CAST")
+ private val <E : Entity<S, M>, S, M> E.context: OmegaContext<S>?
+ get() = registry[this] as? OmegaContext<S>
/**
* The simulation time.
@@ -68,25 +88,30 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot
/**
* The model of simulation.
*/
- override val model: M = bootstrap.apply(this)
+ // XXX: the bootstrap requires the properties of this class to be initialised, so changing the order may cause NPEs
+ override var model: M = bootstrap.apply(this)
+ /**
+ * The observable state of an [Entity] in simulation, which is provided by the simulation context.
+ */
override val <E : Entity<S, *>, S> E.state: S
get() = context?.state ?: initialState
/**
- * The context associated with an [Entity].
+ * Initialise the simulation instance.
*/
- @Suppress("UNCHECKED_CAST")
- private val <E : Entity<S, M>, S, M> E.context: OmegaContext<S>?
- get() = registry[this] as? OmegaContext<S>
+ init {
+ // Launch the Omega kernel process
+ launch(process)
+ }
+ // Bootstrap Context implementation
override fun register(entity: Entity<*, M>): Boolean {
if (!registry.containsKey(entity) && entity !is Process) {
return false
}
- val process = entity as Process<*, M>
- spawnings.add(process)
+ schedule(Launch(entity as Process<*, M>), process)
return true
}
@@ -99,19 +124,9 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot
override fun schedule(message: Any, destination: Entity<*, *>, sender: Entity<*, *>?, delay: Duration) =
schedule(prepare(message, destination, sender, delay))
+ // Simulation implementation
override fun step() {
while (true) {
- // Initialise all spawned processes
- while (spawnings.isNotEmpty()) {
- val process = spawnings.poll()
- val context = OmegaContext(process).also { registry[process] = it }
-
- // Bootstrap the process coroutine
- val block: suspend () -> Unit = { context.start() }
- block.startCoroutine(context)
-
- }
-
val envelope = queue.peek() ?: return
val delivery = envelope.time
@@ -144,9 +159,8 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot
}
}
-
override fun run() {
- while (queue.isNotEmpty() || spawnings.isNotEmpty()) {
+ while (queue.isNotEmpty()) {
step()
}
}
@@ -158,7 +172,7 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot
return
}
- while (time < until && (queue.isNotEmpty() || spawnings.isNotEmpty())) {
+ while (time < until && queue.isNotEmpty()) {
step()
}
@@ -170,10 +184,42 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot
}
}
+ /**
+ * A wrapper around a message that has been scheduled for processing.
+ *
+ * @property message The message to wrap.
+ * @property time The point in time to deliver the message.
+ * @property sender The sender of the message.
+ * @property destination The destination of the message.
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+ private data class MessageContainer(override val message: Any,
+ val time: Instant,
+ override val sender: Entity<*, *>?,
+ override val destination: Entity<*, *>) : Envelope<Any> {
+ /**
+ * A flag to indicate the message has been canceled.
+ */
+ internal var canceled: Boolean = false
+ }
+
+ /**
+ * Schedule the given envelope to be processed by the kernel.
+ *
+ * @param envelope The envelope containing the message to schedule.
+ */
private fun schedule(envelope: MessageContainer) {
queue.add(envelope)
}
+ /**
+ * Prepare a message for scheduling by wrapping it into an envelope.
+ *
+ * @param message The message to send.
+ * @param destination The destination entity that should receive the message.
+ * @param sender The optional sender of the message.
+ * @param delay The time to delay the message.
+ */
private fun prepare(message: Any, destination: Entity<*, *>, sender: Entity<*, *>? = null,
delay: Duration): MessageContainer {
require(delay >= 0) { "The amount of time to delay the message must be a positive number" }
@@ -181,19 +227,22 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot
}
/**
- * This internal class provides the default implementation for the [Context] interface for this simulator.
+ * Launch the given [Process].
+ *
+ * @param process The process to launch.
*/
- private inner class OmegaContext<S>(val process: Process<S, M>) : Context<S, M>, Continuation<Unit> {
- /**
- * The continuation to resume the execution of the process.
- */
- lateinit var continuation: Continuation<Envelope<*>>
+ private fun launch(process: Process<*, M>) {
+ val context = OmegaContext(process).also { registry[process] = it }
- /**
- * The last point in time the process has done some work.
- */
- var last: Instant = -1
+ // Bootstrap the process coroutine
+ val block: suspend () -> Unit = { context.start() }
+ block.startCoroutine(context)
+ }
+ /**
+ * This internal class provides the default implementation for the [Context] interface for this simulator.
+ */
+ private inner class OmegaContext<S>(val process: Process<S, M>) : Context<S, M>, Continuation<Unit> {
/**
* The model in which the process exists.
*/
@@ -230,19 +279,14 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot
get() = context?.state ?: initialState
/**
- * Start the process associated with this context.
+ * The continuation to resume the execution of the process.
*/
- internal suspend fun start() = process.run {
- run()
- }
+ lateinit var continuation: Continuation<Envelope<*>>
/**
- * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the
- * message has been received.
- *
- * @return The envelope containing the message.
+ * The last point in time the process has done some work.
*/
- suspend fun receiveEnvelope(): Envelope<*> = suspendCoroutine { continuation = it }
+ var last: Instant = -1
override suspend fun <T> receive(transform: suspend Envelope<*>.(Any) -> T): T {
val envelope = receiveEnvelope()
@@ -304,6 +348,18 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot
}
}
+ /**
+ * Start the process associated with this context.
+ */
+ internal suspend fun start() = process.run { run() }
+
+ /**
+ * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the
+ * message has been received.
+ *
+ * @return The envelope containing the message.
+ */
+ suspend fun receiveEnvelope(): Envelope<*> = suspendCoroutine { continuation = it }
// Completion continuation implementation
/**
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Resume.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Resume.kt
deleted file mode 100644
index d4bd8536..00000000
--- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Resume.kt
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2017 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.omega
-
-import com.atlarge.opendc.simulator.Context
-import com.atlarge.opendc.simulator.Process
-
-/**
- * An internal message used by the Omega simulation kernel to indicate to a suspended [Process], that it should wake up
- * and resume execution.
- *
- * This message is not guaranteed to work on other simulation kernels and [Context.interrupt] should be preferred to
- * wake up a process from another entity.
- *
- * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
- */
-object Resume
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Timeout.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Timeout.kt
deleted file mode 100644
index c205f6b5..00000000
--- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/Timeout.kt
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2017 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.opendc.omega
-
-import com.atlarge.opendc.simulator.Process
-
-/**
- * An internal message used by the Omega simulation kernel to indicate to a suspended [Process], that a timeout has been
- * reached and that it should wake up and resume execution.
- *
- * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
- */
-object Timeout