diff options
Diffstat (limited to 'opendc-omega/src/main')
| -rw-r--r-- | opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt | 61 |
1 files changed, 28 insertions, 33 deletions
diff --git a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt index caaa335b..48b9d556 100644 --- a/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt +++ b/opendc-omega/src/main/kotlin/nl/atlarge/opendc/kernel/omega/OmegaSimulation.kt @@ -71,24 +71,32 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to private val queue: Queue<MessageContainer> = PriorityQueue(Comparator.comparingLong(MessageContainer::time)) /** - * The observable state of an [Entity] in simulation, which is provided by the simulation context. + * The context associated with an [Entity]. */ @Suppress("UNCHECKED_CAST") + private val <E : Entity<S>, S> E.context: OmegaContext<E, S>? + get() = registry[this] as? OmegaContext<E, S> + + /** + * The observable state of an [Entity] in simulation, which is provided by the simulation context. + */ override val <E : Entity<S>, S> E.state: S - get() = (resolve(this) as OmegaContext<E, S>?)?.state ?: initialState + get() = context?.state ?: initialState /** * Initialise the simulator. */ init { - topology.forEach { resolve(it) } - registry.values.forEach { context -> - @Suppress("UNCHECKED_CAST") - val process = context.entity as Process<Entity<*>> - - // Start all process co-routines - val block: suspend () -> Unit = { process.run { context.run() } } - block.startCoroutine(context) + topology.forEach { entity -> + if (entity is Process<*>) { + @Suppress("UNCHECKED_CAST") + val context = registry.computeIfAbsent(entity, { OmegaContext(entity) }) as OmegaContext<Nothing, *> + val process = entity as Process<*> + + // Start all process co-routines + val block: suspend () -> Unit = { process.run { context.run() } } + block.startCoroutine(context) + } } } @@ -117,7 +125,7 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to continue } - val context = registry[envelope.destination] ?: continue + val context = envelope.destination.context ?: continue if (envelope.message !is Interrupt) { context.continuation.resume(envelope) @@ -181,23 +189,6 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to } /** - * Resolve the given [Context], given an [Entity] in a logical topology of a cloud network. - * - * @param entity The [Entity] to resolve the [Context] for. - * @return The [Context] for the given [Entity] or <code>null</code> if the component has no [Process] associated - * with it. - */ - private fun <E : Entity<*>> resolve(entity: E): Context<E>? { - if (entity !is Process<*>) - return null - - @Suppress("UNCHECKED_CAST") - return registry.computeIfAbsent(entity, { - OmegaContext(entity) - }) as Context<E> - } - - /** * This internal class provides the default implementation for the [Context] interface for this simulator. */ private inner class OmegaContext<out E : Entity<S>, S>(override val entity: E) : Context<E>, @@ -243,9 +234,8 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to /** * The observable state of an [Entity] within the simulation is provided by the context of the simulation. */ - @Suppress("UNCHECKED_CAST") override val <T : Entity<S>, S> T.state: S - get() = (resolve(this) as OmegaContext<T, S>?)?.state ?: initialState + get() = context?.state ?: initialState /** * Retrieve and remove and single message from the mailbox of the [Entity] and suspend the [Process] until the @@ -388,7 +378,10 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to * * @param value The value to resume with. */ - override fun resume(value: Unit) {} + override fun resume(value: Unit) { + // Deregister process from registry + registry.remove(entity) + } /** * Resume the execution of this continuation with an exception. @@ -396,8 +389,10 @@ internal class OmegaSimulation(override val kernel: OmegaKernel, override val to * @param exception The exception to resume with. */ override fun resumeWithException(exception: Throwable) { - val currentThread = Thread.currentThread() - currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception) + // Deregister process from registry + registry.remove(entity) + + logger.error(exception) { "An exception occurred during the execution of a process" } } } } |
