diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2018-02-12 22:27:44 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2018-02-12 22:27:44 +0100 |
| commit | 407877ff24d3c54747c8b15bba73b93b38c8b6e7 (patch) | |
| tree | aae7eb608c97877220bb157ef54e527484e466ad | |
| parent | e97d9bf3f2cccf19a21631e26d55d60c9f4d7c7a (diff) | |
refactor(#18): Launch processes at initial run
This change will make the simulation kernel launch the processes at the
initial run instead of when the processes are registered.
| -rw-r--r-- | opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt | 37 | ||||
| -rw-r--r-- | opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt | 36 |
2 files changed, 59 insertions, 14 deletions
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt index c729a63d..91150078 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt @@ -56,6 +56,11 @@ internal class OmegaKernel<M>(bootstrap: Bootstrap<M>) : Kernel<M>, Bootstrap.Co private val queue: Queue<MessageContainer> = PriorityQueue(Comparator.comparingLong(MessageContainer::time)) /** + * The processes to be spawned. + */ + private val spawnings: Queue<Process<*, M>> = ArrayDeque() + + /** * The simulation time. */ override var time: Instant = 0 @@ -80,14 +85,8 @@ internal class OmegaKernel<M>(bootstrap: Bootstrap<M>) : Kernel<M>, Bootstrap.Co return false } - @Suppress("UNCHECKED_CAST") - val process = entity as Process<Any, M> - val context = OmegaContext(entity).also { registry[entity] = it } - - // Bootstrap the process coroutine - val block: suspend () -> Unit = { process.run { context.run() } } - block.startCoroutine(context) - + val process = entity as Process<*, M> + spawnings.add(process) return true } @@ -102,6 +101,17 @@ internal class OmegaKernel<M>(bootstrap: Bootstrap<M>) : Kernel<M>, Bootstrap.Co override fun step() { while (true) { + // Initialise all spawned processes + while (spawnings.isNotEmpty()) { + val process = spawnings.poll() + val context = OmegaContext(process).also { registry[process] = it } + + // Bootstrap the process coroutine + val block: suspend () -> Unit = { context.start() } + block.startCoroutine(context) + + } + val envelope = queue.peek() ?: return val delivery = envelope.time @@ -136,7 +146,7 @@ internal class OmegaKernel<M>(bootstrap: Bootstrap<M>) : Kernel<M>, Bootstrap.Co override fun run() { - while (queue.isNotEmpty()) { + while (queue.isNotEmpty() || spawnings.isNotEmpty()) { step() } } @@ -148,7 +158,7 @@ internal class OmegaKernel<M>(bootstrap: Bootstrap<M>) : Kernel<M>, Bootstrap.Co return } - while (time < until && queue.isNotEmpty()) { + while (time < until && (queue.isNotEmpty() || spawnings.isNotEmpty())) { step() } @@ -220,6 +230,13 @@ internal class OmegaKernel<M>(bootstrap: Bootstrap<M>) : Kernel<M>, Bootstrap.Co get() = context?.state ?: initialState /** + * Start the process associated with this context. + */ + internal suspend fun start() = process.run { + run() + } + + /** * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the * message has been received. * diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt index a1ec8e88..48b4f02a 100644 --- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt +++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt @@ -27,6 +27,8 @@ package com.atlarge.opendc.omega import com.atlarge.opendc.simulator.Bootstrap import com.atlarge.opendc.simulator.Context import com.atlarge.opendc.simulator.Process +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test /** @@ -68,7 +70,7 @@ internal class SmokeTest { kernel.run() } - class NullProcess : Process<Unit, Unit> { + object NullProcess : Process<Unit, Unit> { override val initialState = Unit override suspend fun Context<Unit, Unit>.run() {} } @@ -78,7 +80,7 @@ internal class SmokeTest { */ @Test fun `sending message to process that has gracefully stopped`() { - val process = NullProcess() + val process = NullProcess val bootstrap: Bootstrap<Unit> = Bootstrap.create { ctx -> process.also { ctx.register(it) @@ -90,7 +92,7 @@ internal class SmokeTest { kernel.run() } - class CrashProcess : Process<Unit, Unit> { + object CrashProcess : Process<Unit, Unit> { override val initialState = Unit override suspend fun Context<Unit, Unit>.run() { TODO("This process should crash") @@ -102,7 +104,7 @@ internal class SmokeTest { */ @Test fun `sending message to process that has crashed`() { - val process = CrashProcess() + val process = CrashProcess val bootstrap: Bootstrap<Unit> = Bootstrap.create { ctx -> process.also { ctx.register(it) @@ -113,4 +115,30 @@ internal class SmokeTest { val kernel = OmegaKernelFactory.create(bootstrap) kernel.run() } + + class ModelProcess(private val value: Int) : Process<Boolean, Int> { + override val initialState = false + override suspend fun Context<Boolean, Int>.run() { + assertEquals(value, model) + state = true + hold(10) + } + } + /** + * Test if the kernel allows access to the simulation model object. + */ + @Test + fun `access simulation model`() { + val value = 1 + val process = ModelProcess(value) + val bootstrap: Bootstrap<Int> = Bootstrap.create { ctx -> + ctx.register(process) + value + } + + val kernel = OmegaKernelFactory.create(bootstrap) + kernel.run(5) + + assertTrue(kernel.run { process.state }) + } } |
