summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt37
-rw-r--r--opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt36
2 files changed, 59 insertions, 14 deletions
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt
index c729a63d..91150078 100644
--- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt
+++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaKernel.kt
@@ -56,6 +56,11 @@ internal class OmegaKernel<M>(bootstrap: Bootstrap<M>) : Kernel<M>, Bootstrap.Co
private val queue: Queue<MessageContainer> = PriorityQueue(Comparator.comparingLong(MessageContainer::time))
/**
+ * The processes to be spawned.
+ */
+ private val spawnings: Queue<Process<*, M>> = ArrayDeque()
+
+ /**
* The simulation time.
*/
override var time: Instant = 0
@@ -80,14 +85,8 @@ internal class OmegaKernel<M>(bootstrap: Bootstrap<M>) : Kernel<M>, Bootstrap.Co
return false
}
- @Suppress("UNCHECKED_CAST")
- val process = entity as Process<Any, M>
- val context = OmegaContext(entity).also { registry[entity] = it }
-
- // Bootstrap the process coroutine
- val block: suspend () -> Unit = { process.run { context.run() } }
- block.startCoroutine(context)
-
+ val process = entity as Process<*, M>
+ spawnings.add(process)
return true
}
@@ -102,6 +101,17 @@ internal class OmegaKernel<M>(bootstrap: Bootstrap<M>) : Kernel<M>, Bootstrap.Co
override fun step() {
while (true) {
+ // Initialise all spawned processes
+ while (spawnings.isNotEmpty()) {
+ val process = spawnings.poll()
+ val context = OmegaContext(process).also { registry[process] = it }
+
+ // Bootstrap the process coroutine
+ val block: suspend () -> Unit = { context.start() }
+ block.startCoroutine(context)
+
+ }
+
val envelope = queue.peek() ?: return
val delivery = envelope.time
@@ -136,7 +146,7 @@ internal class OmegaKernel<M>(bootstrap: Bootstrap<M>) : Kernel<M>, Bootstrap.Co
override fun run() {
- while (queue.isNotEmpty()) {
+ while (queue.isNotEmpty() || spawnings.isNotEmpty()) {
step()
}
}
@@ -148,7 +158,7 @@ internal class OmegaKernel<M>(bootstrap: Bootstrap<M>) : Kernel<M>, Bootstrap.Co
return
}
- while (time < until && queue.isNotEmpty()) {
+ while (time < until && (queue.isNotEmpty() || spawnings.isNotEmpty())) {
step()
}
@@ -220,6 +230,13 @@ internal class OmegaKernel<M>(bootstrap: Bootstrap<M>) : Kernel<M>, Bootstrap.Co
get() = context?.state ?: initialState
/**
+ * Start the process associated with this context.
+ */
+ internal suspend fun start() = process.run {
+ run()
+ }
+
+ /**
* Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Context] until the
* message has been received.
*
diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
index a1ec8e88..48b4f02a 100644
--- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
+++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
@@ -27,6 +27,8 @@ package com.atlarge.opendc.omega
import com.atlarge.opendc.simulator.Bootstrap
import com.atlarge.opendc.simulator.Context
import com.atlarge.opendc.simulator.Process
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
/**
@@ -68,7 +70,7 @@ internal class SmokeTest {
kernel.run()
}
- class NullProcess : Process<Unit, Unit> {
+ object NullProcess : Process<Unit, Unit> {
override val initialState = Unit
override suspend fun Context<Unit, Unit>.run() {}
}
@@ -78,7 +80,7 @@ internal class SmokeTest {
*/
@Test
fun `sending message to process that has gracefully stopped`() {
- val process = NullProcess()
+ val process = NullProcess
val bootstrap: Bootstrap<Unit> = Bootstrap.create { ctx ->
process.also {
ctx.register(it)
@@ -90,7 +92,7 @@ internal class SmokeTest {
kernel.run()
}
- class CrashProcess : Process<Unit, Unit> {
+ object CrashProcess : Process<Unit, Unit> {
override val initialState = Unit
override suspend fun Context<Unit, Unit>.run() {
TODO("This process should crash")
@@ -102,7 +104,7 @@ internal class SmokeTest {
*/
@Test
fun `sending message to process that has crashed`() {
- val process = CrashProcess()
+ val process = CrashProcess
val bootstrap: Bootstrap<Unit> = Bootstrap.create { ctx ->
process.also {
ctx.register(it)
@@ -113,4 +115,30 @@ internal class SmokeTest {
val kernel = OmegaKernelFactory.create(bootstrap)
kernel.run()
}
+
+ class ModelProcess(private val value: Int) : Process<Boolean, Int> {
+ override val initialState = false
+ override suspend fun Context<Boolean, Int>.run() {
+ assertEquals(value, model)
+ state = true
+ hold(10)
+ }
+ }
+ /**
+ * Test if the kernel allows access to the simulation model object.
+ */
+ @Test
+ fun `access simulation model`() {
+ val value = 1
+ val process = ModelProcess(value)
+ val bootstrap: Bootstrap<Int> = Bootstrap.create { ctx ->
+ ctx.register(process)
+ value
+ }
+
+ val kernel = OmegaKernelFactory.create(bootstrap)
+ kernel.run(5)
+
+ assertTrue(kernel.run { process.state })
+ }
}