summaryrefslogtreecommitdiff
path: root/opendc-omega/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2017-10-24 17:40:07 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2017-10-24 18:06:25 +0200
commit8b53d07898841b328897c60427e6df9f8c71546e (patch)
tree0d8a2b5e399954e7b6138a321752af60cdcf8f60 /opendc-omega/src/main
parentdad4487fddaccf44240f5d62fa83830f7bbf8a5d (diff)
bug(#15): Fix message passing to stopped processes
This change fixes a bug where sending a message to a stopped process (gracefully or forced) would crash the simulation kernel with an UninitializedPropertyAccessException. This was caused by the fact that these processes still existed in the registry, which caused the kernel to lookup a non-existent continuation of a process. This change will make the kernel remove stopped processes from the registry, so they cannot be found anymore. Closes #15
Diffstat (limited to 'opendc-omega/src/main')
-rw-r--r--opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt61
1 files changed, 28 insertions, 33 deletions
diff --git a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt
index caaa335b..48b9d556 100644
--- a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt
+++ b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt
@@ -71,24 +71,32 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to
private val queue: Queue<MessageContainer> = PriorityQueue(Comparator.comparingLong(MessageContainer::time))
/**
- * The observable state of an [Entity] in simulation, which is provided by the simulation context.
+ * The context associated with an [Entity].
*/
@Suppress("UNCHECKED_CAST")
+ private val <E : Entity<S>, S> E.context: OmegaContext<E, S>?
+ get() = registry[this] as? OmegaContext<E, S>
+
+ /**
+ * The observable state of an [Entity] in simulation, which is provided by the simulation context.
+ */
override val <E : Entity<S>, S> E.state: S
- get() = (resolve(this) as OmegaContext<E, S>?)?.state ?: initialState
+ get() = context?.state ?: initialState
/**
* Initialise the simulator.
*/
init {
- topology.forEach { resolve(it) }
- registry.values.forEach { context ->
- @Suppress("UNCHECKED_CAST")
- val process = context.entity as Process<Entity<*>>
-
- // Start all process co-routines
- val block: suspend () -> Unit = { process.run { context.run() } }
- block.startCoroutine(context)
+ topology.forEach { entity ->
+ if (entity is Process<*>) {
+ @Suppress("UNCHECKED_CAST")
+ val context = registry.computeIfAbsent(entity, { OmegaContext(entity) }) as OmegaContext<Nothing, *>
+ val process = entity as Process<*>
+
+ // Start all process co-routines
+ val block: suspend () -> Unit = { process.run { context.run() } }
+ block.startCoroutine(context)
+ }
}
}
@@ -117,7 +125,7 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to
continue
}
- val context = registry[envelope.destination] ?: continue
+ val context = envelope.destination.context ?: continue
if (envelope.message !is Interrupt) {
context.continuation.resume(envelope)
@@ -181,23 +189,6 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to
}
/**
- * Resolve the given [Context], given an [Entity] in a logical topology of a cloud network.
- *
- * @param entity The [Entity] to resolve the [Context] for.
- * @return The [Context] for the given [Entity] or <code>null</code> if the component has no [Process] associated
- * with it.
- */
- private fun <E : Entity<*>> resolve(entity: E): Context<E>? {
- if (entity !is Process<*>)
- return null
-
- @Suppress("UNCHECKED_CAST")
- return registry.computeIfAbsent(entity, {
- OmegaContext(entity)
- }) as Context<E>
- }
-
- /**
* This internal class provides the default implementation for the [Context] interface for this simulator.
*/
private inner class OmegaContext<out E : Entity<S>, S>(override val entity: E) : Context<E>,
@@ -243,9 +234,8 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to
/**
* The observable state of an [Entity] within the simulation is provided by the context of the simulation.
*/
- @Suppress("UNCHECKED_CAST")
override val <T : Entity<S>, S> T.state: S
- get() = (resolve(this) as OmegaContext<T, S>?)?.state ?: initialState
+ get() = context?.state ?: initialState
/**
* Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Process] until the
@@ -388,7 +378,10 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to
*
* @param value The value to resume with.
*/
- override fun resume(value: Unit) {}
+ override fun resume(value: Unit) {
+ // Deregister process from registry
+ registry.remove(entity)
+ }
/**
* Resume the execution of this continuation with an exception.
@@ -396,8 +389,10 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to
* @param exception The exception to resume with.
*/
override fun resumeWithException(exception: Throwable) {
- val currentThread = Thread.currentThread()
- currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception)
+ // Deregister process from registry
+ registry.remove(entity)
+
+ logger.error(exception) { "An exception occurred during the execution of a process" }
}
}
}