summaryrefslogtreecommitdiff
path: root/odcsim-engine-omega
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2019-04-25 21:28:10 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2019-05-13 20:26:47 +0200
commit4e6920d5c128b49750408a11850dfa6a7abb1e9e (patch)
treecc1e718ee8a04a94c3ec7623e977fb681d47dbcf /odcsim-engine-omega
parent38828ab3708a22bbd321c76d936648a2010f5c60 (diff)
feat: Add support for ActorSystem termination
This change adds support for terminating an ActorSystem.
Diffstat (limited to 'odcsim-engine-omega')
-rw-r--r--odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt41
1 files changed, 28 insertions, 13 deletions
diff --git a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
index 5a886a6d..566f1ff5 100644
--- a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
+++ b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
@@ -62,7 +62,7 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
/**
* A flag to indicate the system has started.
*/
- private var isStarted: Boolean = false
+ private var state: ActorSystemState = ActorSystemState.PENDING
/**
* The event queue to process
@@ -90,9 +90,11 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
require(until >= .0) { "The given instant must be a non-negative number" }
// Start the root actor on initial run
- if (!isStarted) {
- registry[path]!!.start()
- isStarted = true
+ if (state == ActorSystemState.PENDING) {
+ registry[path]!!.isolate { it.start() }
+ state = ActorSystemState.STARTED
+ } else if (state == ActorSystemState.TERMINATED) {
+ throw IllegalStateException("The ActorSystem has been terminated.")
}
while (time < until) {
@@ -107,15 +109,8 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
queue.poll()
val actor = registry[envelope.destination] ?: continue
- try {
- // Notice that messages for unknown/terminated actors are ignored for now
- actor.interpretMessage(envelope.message)
- } catch (e: Exception) {
- // Forcefully stop the actor if it crashed
- actor.stop()
-
- logger.error(e) { "Unhandled exception in actor ${envelope.destination}" }
- }
+ // Notice that messages for unknown/terminated actors are ignored for now
+ actor.isolate { it.interpretMessage(envelope.message) }
}
// Jump forward in time as the caller expects the system to have run until the specified instant
@@ -125,6 +120,10 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
override fun send(msg: T, after: Duration) = schedule(this, msg, after)
+ override fun terminate() {
+ registry[path]?.stop()
+ }
+
/**
* The identifier for the next message to be scheduled.
*/
@@ -222,6 +221,22 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
override fun hashCode(): Int = self.path.hashCode()
}
+ private inline fun <T : Any, U> Actor<T>.isolate(block: (Actor<T>) -> U): U? {
+ return try {
+ block(this)
+ } catch (e: Exception) {
+ // Forcefully stop the actor if it crashed
+ stop()
+ logger.error(e) { "Unhandled exception in actor $path" }
+ null
+ }
+ }
+
+
+ private enum class ActorSystemState {
+ PENDING, STARTED, TERMINATED
+ }
+
private inner class ActorRefImpl<T : Any>(override val path: ActorPath) : ActorRef<T>
/**