summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt61
-rw-r--r--opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt51
2 files changed, 78 insertions, 34 deletions
diff --git a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt
index caaa335b..48b9d556 100644
--- a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt
+++ b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt
@@ -71,24 +71,32 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to
private val queue: Queue<MessageContainer> = PriorityQueue(Comparator.comparingLong(MessageContainer::time))
/**
- * The observable state of an [Entity] in simulation, which is provided by the simulation context.
+ * The context associated with an [Entity].
*/
@Suppress("UNCHECKED_CAST")
+ private val <E : Entity<S>, S> E.context: OmegaContext<E, S>?
+ get() = registry[this] as? OmegaContext<E, S>
+
+ /**
+ * The observable state of an [Entity] in simulation, which is provided by the simulation context.
+ */
override val <E : Entity<S>, S> E.state: S
- get() = (resolve(this) as OmegaContext<E, S>?)?.state ?: initialState
+ get() = context?.state ?: initialState
/**
* Initialise the simulator.
*/
init {
- topology.forEach { resolve(it) }
- registry.values.forEach { context ->
- @Suppress("UNCHECKED_CAST")
- val process = context.entity as Process<Entity<*>>
-
- // Start all process co-routines
- val block: suspend () -> Unit = { process.run { context.run() } }
- block.startCoroutine(context)
+ topology.forEach { entity ->
+ if (entity is Process<*>) {
+ @Suppress("UNCHECKED_CAST")
+ val context = registry.computeIfAbsent(entity, { OmegaContext(entity) }) as OmegaContext<Nothing, *>
+ val process = entity as Process<*>
+
+ // Start all process co-routines
+ val block: suspend () -> Unit = { process.run { context.run() } }
+ block.startCoroutine(context)
+ }
}
}
@@ -117,7 +125,7 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to
continue
}
- val context = registry[envelope.destination] ?: continue
+ val context = envelope.destination.context ?: continue
if (envelope.message !is Interrupt) {
context.continuation.resume(envelope)
@@ -181,23 +189,6 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to
}
/**
- * Resolve the given [Context], given an [Entity] in a logical topology of a cloud network.
- *
- * @param entity The [Entity] to resolve the [Context] for.
- * @return The [Context] for the given [Entity] or <code>null</code> if the component has no [Process] associated
- * with it.
- */
- private fun <E : Entity<*>> resolve(entity: E): Context<E>? {
- if (entity !is Process<*>)
- return null
-
- @Suppress("UNCHECKED_CAST")
- return registry.computeIfAbsent(entity, {
- OmegaContext(entity)
- }) as Context<E>
- }
-
- /**
* This internal class provides the default implementation for the [Context] interface for this simulator.
*/
private inner class OmegaContext<out E : Entity<S>, S>(override val entity: E) : Context<E>,
@@ -243,9 +234,8 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to
/**
* The observable state of an [Entity] within the simulation is provided by the context of the simulation.
*/
- @Suppress("UNCHECKED_CAST")
override val <T : Entity<S>, S> T.state: S
- get() = (resolve(this) as OmegaContext<T, S>?)?.state ?: initialState
+ get() = context?.state ?: initialState
/**
* Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Process] until the
@@ -388,7 +378,10 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to
*
* @param value The value to resume with.
*/
- override fun resume(value: Unit) {}
+ override fun resume(value: Unit) {
+ // Deregister process from registry
+ registry.remove(entity)
+ }
/**
* Resume the execution of this continuation with an exception.
@@ -396,8 +389,10 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to
* @param exception The exception to resume with.
*/
override fun resumeWithException(exception: Throwable) {
- val currentThread = Thread.currentThread()
- currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception)
+ // Deregister process from registry
+ registry.remove(entity)
+
+ logger.error(exception) { "An exception occurred during the execution of a process" }
}
}
}
diff --git a/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt b/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt
index fdc7b033..cb2ce643 100644
--- a/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt
+++ b/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt
@@ -24,8 +24,11 @@
package nl.atlarge.opendc
+import nl.atlarge.opendc.kernel.Context
+import nl.atlarge.opendc.kernel.Process
import nl.atlarge.opendc.kernel.omega.OmegaKernel
import nl.atlarge.opendc.topology.AdjacencyList
+import nl.atlarge.opendc.topology.Entity
import nl.atlarge.opendc.topology.machine.Machine
import org.junit.jupiter.api.Test
@@ -44,7 +47,7 @@ internal class SmokeTest {
val builder = AdjacencyList.builder()
repeat(n) {
val root = Machine()
- val topology = AdjacencyList.builder().construct {
+ val topology = builder.construct {
add(root)
val other = Machine()
@@ -63,4 +66,50 @@ internal class SmokeTest {
simulation.run()
}
}
+
+ class NullProcess : Entity<Unit>, Process<NullProcess> {
+ override val initialState = Unit
+ suspend override fun Context<NullProcess>.run() {}
+ }
+
+ /**
+ * Test if the kernel allows sending messages to [Process] instances that have already stopped.
+ */
+ @Test
+ fun `sending message to process that has gracefully stopped`() {
+
+ val builder = AdjacencyList.builder()
+ val process = NullProcess()
+ val topology = builder.construct {
+ add(process)
+ }
+
+ val simulation = OmegaKernel.create(topology)
+ simulation.schedule(0, process)
+ simulation.run()
+ }
+
+ class CrashProcess : Entity<Unit>, Process<NullProcess> {
+ override val initialState = Unit
+ suspend override fun Context<NullProcess>.run() {
+ TODO()
+ }
+ }
+
+ /**
+ * Test if the kernel allows sending messages to [Process] instances that have crashed.
+ */
+ @Test
+ fun `sending message to process that has crashed`() {
+
+ val builder = AdjacencyList.builder()
+ val process = CrashProcess()
+ val topology = builder.construct {
+ add(process)
+ }
+
+ val simulation = OmegaKernel.create(topology)
+ simulation.schedule(0, process)
+ simulation.run()
+ }
}