From 8b53d07898841b328897c60427e6df9f8c71546e Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 24 Oct 2017 17:40:07 +0200 Subject: bug(#15): Fix message passing to stopped processes This change fixes a bug where sending a message to a stopped process (gracefully or forced) would crash the simulation kernel with an UninitializedPropertyAccessException. This was caused by the fact that these processes still existed in the registry, which caused the kernel to lookup a non-existent continuation of a process. This change will make the kernel remove stopped processes from the registry, so they cannot be found anymore. Closes #15 --- .../atlarge/opendc/kernel/omega/OmegaSimulation.kt | 61 ++++++++++------------ .../src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt | 51 +++++++++++++++++- 2 files changed, 78 insertions(+), 34 deletions(-) diff --git a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt index caaa335b..48b9d556 100644 --- a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt +++ b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt @@ -71,24 +71,32 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to private val queue: Queue = PriorityQueue(Comparator.comparingLong(MessageContainer::time)) /** - * The observable state of an [Entity] in simulation, which is provided by the simulation context. + * The context associated with an [Entity]. */ @Suppress("UNCHECKED_CAST") + private val , S> E.context: OmegaContext? + get() = registry[this] as? OmegaContext + + /** + * The observable state of an [Entity] in simulation, which is provided by the simulation context. + */ override val , S> E.state: S - get() = (resolve(this) as OmegaContext?)?.state ?: initialState + get() = context?.state ?: initialState /** * Initialise the simulator. */ init { - topology.forEach { resolve(it) } - registry.values.forEach { context -> - @Suppress("UNCHECKED_CAST") - val process = context.entity as Process> - - // Start all process co-routines - val block: suspend () -> Unit = { process.run { context.run() } } - block.startCoroutine(context) + topology.forEach { entity -> + if (entity is Process<*>) { + @Suppress("UNCHECKED_CAST") + val context = registry.computeIfAbsent(entity, { OmegaContext(entity) }) as OmegaContext + val process = entity as Process<*> + + // Start all process co-routines + val block: suspend () -> Unit = { process.run { context.run() } } + block.startCoroutine(context) + } } } @@ -117,7 +125,7 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to continue } - val context = registry[envelope.destination] ?: continue + val context = envelope.destination.context ?: continue if (envelope.message !is Interrupt) { context.continuation.resume(envelope) @@ -180,23 +188,6 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to return wrapped } - /** - * Resolve the given [Context], given an [Entity] in a logical topology of a cloud network. - * - * @param entity The [Entity] to resolve the [Context] for. - * @return The [Context] for the given [Entity] or null if the component has no [Process] associated - * with it. - */ - private fun > resolve(entity: E): Context? { - if (entity !is Process<*>) - return null - - @Suppress("UNCHECKED_CAST") - return registry.computeIfAbsent(entity, { - OmegaContext(entity) - }) as Context - } - /** * This internal class provides the default implementation for the [Context] interface for this simulator. */ @@ -243,9 +234,8 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to /** * The observable state of an [Entity] within the simulation is provided by the context of the simulation. */ - @Suppress("UNCHECKED_CAST") override val , S> T.state: S - get() = (resolve(this) as OmegaContext?)?.state ?: initialState + get() = context?.state ?: initialState /** * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Process] until the @@ -388,7 +378,10 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to * * @param value The value to resume with. */ - override fun resume(value: Unit) {} + override fun resume(value: Unit) { + // Deregister process from registry + registry.remove(entity) + } /** * Resume the execution of this continuation with an exception. @@ -396,8 +389,10 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to * @param exception The exception to resume with. */ override fun resumeWithException(exception: Throwable) { - val currentThread = Thread.currentThread() - currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception) + // Deregister process from registry + registry.remove(entity) + + logger.error(exception) { "An exception occurred during the execution of a process" } } } } diff --git a/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt b/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt index fdc7b033..cb2ce643 100644 --- a/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt +++ b/opendc-omega/src/test/kotlin/nl/atlarge/opendc/SmokeTest.kt @@ -24,8 +24,11 @@ package nl.atlarge.opendc +import nl.atlarge.opendc.kernel.Context +import nl.atlarge.opendc.kernel.Process import nl.atlarge.opendc.kernel.omega.OmegaKernel import nl.atlarge.opendc.topology.AdjacencyList +import nl.atlarge.opendc.topology.Entity import nl.atlarge.opendc.topology.machine.Machine import org.junit.jupiter.api.Test @@ -44,7 +47,7 @@ internal class SmokeTest { val builder = AdjacencyList.builder() repeat(n) { val root = Machine() - val topology = AdjacencyList.builder().construct { + val topology = builder.construct { add(root) val other = Machine() @@ -63,4 +66,50 @@ internal class SmokeTest { simulation.run() } } + + class NullProcess : Entity, Process { + override val initialState = Unit + suspend override fun Context.run() {} + } + + /** + * Test if the kernel allows sending messages to [Process] instances that have already stopped. + */ + @Test + fun `sending message to process that has gracefully stopped`() { + + val builder = AdjacencyList.builder() + val process = NullProcess() + val topology = builder.construct { + add(process) + } + + val simulation = OmegaKernel.create(topology) + simulation.schedule(0, process) + simulation.run() + } + + class CrashProcess : Entity, Process { + override val initialState = Unit + suspend override fun Context.run() { + TODO() + } + } + + /** + * Test if the kernel allows sending messages to [Process] instances that have crashed. + */ + @Test + fun `sending message to process that has crashed`() { + + val builder = AdjacencyList.builder() + val process = CrashProcess() + val topology = builder.construct { + add(process) + } + + val simulation = OmegaKernel.create(topology) + simulation.schedule(0, process) + simulation.run() + } } -- cgit v1.2.3