diff options
Diffstat (limited to 'opendc-kernel-omega/src/main')
| -rw-r--r-- | opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt | 37 |
1 files changed, 27 insertions, 10 deletions
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt index c729a63d..91150078 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt @@ -56,6 +56,11 @@ internal class OmegaKernel<M>(bootstrap: Bootstrap<M>) : Kernel<M>, Bootstrap.Co private val queue: Queue<MessageContainer> = PriorityQueue(Comparator.comparingLong(MessageContainer::time)) /** + * The processes to be spawned. + */ + private val spawnings: Queue<Process<*, M>> = ArrayDeque() + + /** * The simulation time. */ override var time: Instant = 0 @@ -80,14 +85,8 @@ internal class OmegaKernel<M>(bootstrap: Bootstrap<M>) : Kernel<M>, Bootstrap.Co return false } - @Suppress("UNCHECKED_CAST") - val process = entity as Process<Any, M> - val context = OmegaContext(entity).also { registry[entity] = it } - - // Bootstrap the process coroutine - val block: suspend () -> Unit = { process.run { context.run() } } - block.startCoroutine(context) - + val process = entity as Process<*, M> + spawnings.add(process) return true } @@ -102,6 +101,17 @@ internal class OmegaKernel<M>(bootstrap: Bootstrap<M>) : Kernel<M>, Bootstrap.Co override fun step() { while (true) { + // Initialise all spawned processes + while (spawnings.isNotEmpty()) { + val process = spawnings.poll() + val context = OmegaContext(process).also { registry[process] = it } + + // Bootstrap the process coroutine + val block: suspend () -> Unit = { context.start() } + block.startCoroutine(context) + + } + val envelope = queue.peek() ?: return val delivery = envelope.time @@ -136,7 +146,7 @@ internal class OmegaKernel<M>(bootstrap: Bootstrap<M>) : Kernel<M>, Bootstrap.Co override fun run() { - while (queue.isNotEmpty()) { + while (queue.isNotEmpty() || spawnings.isNotEmpty()) { step() } } @@ -148,7 +158,7 @@ internal class OmegaKernel<M>(bootstrap: Bootstrap<M>) : Kernel<M>, Bootstrap.Co return } - while (time < until && queue.isNotEmpty()) { + while (time < until && (queue.isNotEmpty() || spawnings.isNotEmpty())) { step() } @@ -220,6 +230,13 @@ internal class OmegaKernel<M>(bootstrap: Bootstrap<M>) : Kernel<M>, Bootstrap.Co get() = context?.state ?: initialState /** + * Start the process associated with this context. + */ + internal suspend fun start() = process.run { + run() + } + + /** * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the * message has been received. * |
