diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2019-04-25 21:28:10 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2019-05-13 20:26:47 +0200 |
| commit | 4e6920d5c128b49750408a11850dfa6a7abb1e9e (patch) | |
| tree | cc1e718ee8a04a94c3ec7623e977fb681d47dbcf /odcsim-engine-omega/src | |
| parent | 38828ab3708a22bbd321c76d936648a2010f5c60 (diff) | |
feat: Add support for ActorSystem termination
This change adds support for terminating an ActorSystem.
Diffstat (limited to 'odcsim-engine-omega/src')
| -rw-r--r-- | odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt | 41 |
1 files changed, 28 insertions, 13 deletions
diff --git a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt index 5a886a6d..566f1ff5 100644 --- a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt +++ b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt @@ -62,7 +62,7 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) /** * A flag to indicate the system has started. */ - private var isStarted: Boolean = false + private var state: ActorSystemState = ActorSystemState.PENDING /** * The event queue to process @@ -90,9 +90,11 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) require(until >= .0) { "The given instant must be a non-negative number" } // Start the root actor on initial run - if (!isStarted) { - registry[path]!!.start() - isStarted = true + if (state == ActorSystemState.PENDING) { + registry[path]!!.isolate { it.start() } + state = ActorSystemState.STARTED + } else if (state == ActorSystemState.TERMINATED) { + throw IllegalStateException("The ActorSystem has been terminated.") } while (time < until) { @@ -107,15 +109,8 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) queue.poll() val actor = registry[envelope.destination] ?: continue - try { - // Notice that messages for unknown/terminated actors are ignored for now - actor.interpretMessage(envelope.message) - } catch (e: Exception) { - // Forcefully stop the actor if it crashed - actor.stop() - - logger.error(e) { "Unhandled exception in actor ${envelope.destination}" } - } + // Notice that messages for unknown/terminated actors are ignored for now + actor.isolate { it.interpretMessage(envelope.message) } } // Jump forward in time as the caller expects the system to have run until the specified instant @@ -125,6 +120,10 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) override fun send(msg: T, after: Duration) = schedule(this, msg, after) + override fun terminate() { + registry[path]?.stop() + } + /** * The identifier for the next message to be scheduled. */ @@ -222,6 +221,22 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) override fun hashCode(): Int = self.path.hashCode() } + private inline fun <T : Any, U> Actor<T>.isolate(block: (Actor<T>) -> U): U? { + return try { + block(this) + } catch (e: Exception) { + // Forcefully stop the actor if it crashed + stop() + logger.error(e) { "Unhandled exception in actor $path" } + null + } + } + + + private enum class ActorSystemState { + PENDING, STARTED, TERMINATED + } + private inner class ActorRefImpl<T : Any>(override val path: ActorPath) : ActorRef<T> /** |
