From 407877ff24d3c54747c8b15bba73b93b38c8b6e7 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 12 Feb 2018 22:27:44 +0100 Subject: refactor(#18): Launch processes at initial run This change will make the simulation kernel launch the processes at the initial run instead of when the processes are registered. --- .../kotlin/com/atlarge/opendc/omega/OmegaKernel.kt | 37 ++++++++++++++++------ .../kotlin/com/atlarge/opendc/omega/SmokeTest.kt | 36 ++++++++++++++++++--- 2 files changed, 59 insertions(+), 14 deletions(-) diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt index c729a63d..91150078 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt @@ -55,6 +55,11 @@ internal class OmegaKernel(bootstrap: Bootstrap) : Kernel, Bootstrap.Co */ private val queue: Queue = PriorityQueue(Comparator.comparingLong(MessageContainer::time)) + /** + * The processes to be spawned. + */ + private val spawnings: Queue> = ArrayDeque() + /** * The simulation time. */ @@ -80,14 +85,8 @@ internal class OmegaKernel(bootstrap: Bootstrap) : Kernel, Bootstrap.Co return false } - @Suppress("UNCHECKED_CAST") - val process = entity as Process - val context = OmegaContext(entity).also { registry[entity] = it } - - // Bootstrap the process coroutine - val block: suspend () -> Unit = { process.run { context.run() } } - block.startCoroutine(context) - + val process = entity as Process<*, M> + spawnings.add(process) return true } @@ -102,6 +101,17 @@ internal class OmegaKernel(bootstrap: Bootstrap) : Kernel, Bootstrap.Co override fun step() { while (true) { + // Initialise all spawned processes + while (spawnings.isNotEmpty()) { + val process = spawnings.poll() + val context = OmegaContext(process).also { registry[process] = it } + + // Bootstrap the process coroutine + val block: suspend () -> Unit = { context.start() } + block.startCoroutine(context) + + } + val envelope = queue.peek() ?: return val delivery = envelope.time @@ -136,7 +146,7 @@ internal class OmegaKernel(bootstrap: Bootstrap) : Kernel, Bootstrap.Co override fun run() { - while (queue.isNotEmpty()) { + while (queue.isNotEmpty() || spawnings.isNotEmpty()) { step() } } @@ -148,7 +158,7 @@ internal class OmegaKernel(bootstrap: Bootstrap) : Kernel, Bootstrap.Co return } - while (time < until && queue.isNotEmpty()) { + while (time < until && (queue.isNotEmpty() || spawnings.isNotEmpty())) { step() } @@ -219,6 +229,13 @@ internal class OmegaKernel(bootstrap: Bootstrap) : Kernel, Bootstrap.Co override val , S> T.state: S get() = context?.state ?: initialState + /** + * Start the process associated with this context. + */ + internal suspend fun start() = process.run { + run() + } + /** * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the * message has been received. diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt index a1ec8e88..48b4f02a 100644 --- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt +++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt @@ -27,6 +27,8 @@ package com.atlarge.opendc.omega import com.atlarge.opendc.simulator.Bootstrap import com.atlarge.opendc.simulator.Context import com.atlarge.opendc.simulator.Process +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test /** @@ -68,7 +70,7 @@ internal class SmokeTest { kernel.run() } - class NullProcess : Process { + object NullProcess : Process { override val initialState = Unit override suspend fun Context.run() {} } @@ -78,7 +80,7 @@ internal class SmokeTest { */ @Test fun `sending message to process that has gracefully stopped`() { - val process = NullProcess() + val process = NullProcess val bootstrap: Bootstrap = Bootstrap.create { ctx -> process.also { ctx.register(it) @@ -90,7 +92,7 @@ internal class SmokeTest { kernel.run() } - class CrashProcess : Process { + object CrashProcess : Process { override val initialState = Unit override suspend fun Context.run() { TODO("This process should crash") @@ -102,7 +104,7 @@ internal class SmokeTest { */ @Test fun `sending message to process that has crashed`() { - val process = CrashProcess() + val process = CrashProcess val bootstrap: Bootstrap = Bootstrap.create { ctx -> process.also { ctx.register(it) @@ -113,4 +115,30 @@ internal class SmokeTest { val kernel = OmegaKernelFactory.create(bootstrap) kernel.run() } + + class ModelProcess(private val value: Int) : Process { + override val initialState = false + override suspend fun Context.run() { + assertEquals(value, model) + state = true + hold(10) + } + } + /** + * Test if the kernel allows access to the simulation model object. + */ + @Test + fun `access simulation model`() { + val value = 1 + val process = ModelProcess(value) + val bootstrap: Bootstrap = Bootstrap.create { ctx -> + ctx.register(process) + value + } + + val kernel = OmegaKernelFactory.create(bootstrap) + kernel.run(5) + + assertTrue(kernel.run { process.state }) + } } -- cgit v1.2.3