diff options
| -rw-r--r-- | opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt | 61 | ||||
| -rw-r--r-- | opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt | 51 |
2 files changed, 78 insertions, 34 deletions
diff --git a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt index caaa335b..48b9d556 100644 --- a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt +++ b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt @@ -71,24 +71,32 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to private val queue: Queue<MessageContainer> = PriorityQueue(Comparator.comparingLong(MessageContainer::time)) /** - * The observable state of an [Entity] in simulation, which is provided by the simulation context. + * The context associated with an [Entity]. */ @Suppress("UNCHECKED_CAST") + private val <E : Entity<S>, S> E.context: OmegaContext<E, S>? + get() = registry[this] as? OmegaContext<E, S> + + /** + * The observable state of an [Entity] in simulation, which is provided by the simulation context. + */ override val <E : Entity<S>, S> E.state: S - get() = (resolve(this) as OmegaContext<E, S>?)?.state ?: initialState + get() = context?.state ?: initialState /** * Initialise the simulator. */ init { - topology.forEach { resolve(it) } - registry.values.forEach { context -> - @Suppress("UNCHECKED_CAST") - val process = context.entity as Process<Entity<*>> - - // Start all process co-routines - val block: suspend () -> Unit = { process.run { context.run() } } - block.startCoroutine(context) + topology.forEach { entity -> + if (entity is Process<*>) { + @Suppress("UNCHECKED_CAST") + val context = registry.computeIfAbsent(entity, { OmegaContext(entity) }) as OmegaContext<Nothing, *> + val process = entity as Process<*> + + // Start all process co-routines + val block: suspend () -> Unit = { process.run { context.run() } } + block.startCoroutine(context) + } } } @@ -117,7 +125,7 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to continue } - val context = registry[envelope.destination] ?: continue + val context = envelope.destination.context ?: continue if (envelope.message !is Interrupt) { context.continuation.resume(envelope) @@ -181,23 +189,6 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to } /** - * Resolve the given [Context], given an [Entity] in a logical topology of a cloud network. - * - * @param entity The [Entity] to resolve the [Context] for. - * @return The [Context] for the given [Entity] or <code>null</code> if the component has no [Process] associated - * with it. - */ - private fun <E : Entity<*>> resolve(entity: E): Context<E>? { - if (entity !is Process<*>) - return null - - @Suppress("UNCHECKED_CAST") - return registry.computeIfAbsent(entity, { - OmegaContext(entity) - }) as Context<E> - } - - /** * This internal class provides the default implementation for the [Context] interface for this simulator. */ private inner class OmegaContext<out E : Entity<S>, S>(override val entity: E) : Context<E>, @@ -243,9 +234,8 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to /** * The observable state of an [Entity] within the simulation is provided by the context of the simulation. */ - @Suppress("UNCHECKED_CAST") override val <T : Entity<S>, S> T.state: S - get() = (resolve(this) as OmegaContext<T, S>?)?.state ?: initialState + get() = context?.state ?: initialState /** * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Process] until the @@ -388,7 +378,10 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to * * @param value The value to resume with. */ - override fun resume(value: Unit) {} + override fun resume(value: Unit) { + // Deregister process from registry + registry.remove(entity) + } /** * Resume the execution of this continuation with an exception. @@ -396,8 +389,10 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to * @param exception The exception to resume with. */ override fun resumeWithException(exception: Throwable) { - val currentThread = Thread.currentThread() - currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception) + // Deregister process from registry + registry.remove(entity) + + logger.error(exception) { "An exception occurred during the execution of a process" } } } } diff --git a/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt b/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt index fdc7b033..cb2ce643 100644 --- a/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt +++ b/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt @@ -24,8 +24,11 @@ package nl.atlarge.opendc +import nl.atlarge.opendc.kernel.Context +import nl.atlarge.opendc.kernel.Process import nl.atlarge.opendc.kernel.omega.OmegaKernel import nl.atlarge.opendc.topology.AdjacencyList +import nl.atlarge.opendc.topology.Entity import nl.atlarge.opendc.topology.machine.Machine import org.junit.jupiter.api.Test @@ -44,7 +47,7 @@ internal class SmokeTest { val builder = AdjacencyList.builder() repeat(n) { val root = Machine() - val topology = AdjacencyList.builder().construct { + val topology = builder.construct { add(root) val other = Machine() @@ -63,4 +66,50 @@ internal class SmokeTest { simulation.run() } } + + class NullProcess : Entity<Unit>, Process<NullProcess> { + override val initialState = Unit + suspend override fun Context<NullProcess>.run() {} + } + + /** + * Test if the kernel allows sending messages to [Process] instances that have already stopped. + */ + @Test + fun `sending message to process that has gracefully stopped`() { + + val builder = AdjacencyList.builder() + val process = NullProcess() + val topology = builder.construct { + add(process) + } + + val simulation = OmegaKernel.create(topology) + simulation.schedule(0, process) + simulation.run() + } + + class CrashProcess : Entity<Unit>, Process<NullProcess> { + override val initialState = Unit + suspend override fun Context<NullProcess>.run() { + TODO() + } + } + + /** + * Test if the kernel allows sending messages to [Process] instances that have crashed. + */ + @Test + fun `sending message to process that has crashed`() { + + val builder = AdjacencyList.builder() + val process = CrashProcess() + val topology = builder.construct { + add(process) + } + + val simulation = OmegaKernel.create(topology) + simulation.schedule(0, process) + simulation.run() + } } |
