summaryrefslogtreecommitdiff
path: root/odcsim-engine-omega
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2019-05-06 18:21:13 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2019-05-14 12:55:56 +0200
commit8a4972730965506cee3cbbdeb2b735454d4b6500 (patch)
tree86c5373c07faef732a41401e7cea09d267b3c6d6 /odcsim-engine-omega
parent5b48cdbad2493c7af9e79bb9996f195ace3123e5 (diff)
feat: Add support for watching actor termination
This change adds support for tracking termination of actors in the system.
Diffstat (limited to 'odcsim-engine-omega')
-rw-r--r--odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt56
1 files changed, 37 insertions, 19 deletions
diff --git a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
index c1d487b7..8930fb96 100644
--- a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
+++ b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
@@ -35,10 +35,14 @@ import com.atlarge.odcsim.Instant
import com.atlarge.odcsim.PostStop
import com.atlarge.odcsim.PreStart
import com.atlarge.odcsim.Signal
+import com.atlarge.odcsim.Terminated
import com.atlarge.odcsim.internal.BehaviorInterpreter
import com.atlarge.odcsim.internal.logging.LoggerImpl
+import com.sun.xml.internal.messaging.saaj.soap.impl.EnvelopeImpl
import org.slf4j.Logger
+import java.util.Collections
import java.util.PriorityQueue
+import java.util.WeakHashMap
import kotlin.math.max
/**
@@ -119,7 +123,7 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
override fun send(msg: T, after: Duration) = schedule(this, msg, after)
override fun terminate() {
- registry[path]?.stop()
+ registry[path]?.stop(null)
}
override fun compareTo(other: ActorRef<*>): Int = path.compareTo(other.path)
@@ -138,6 +142,7 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
private inner class Actor<T : Any>(override val self: ActorRef<T>, initialBehavior: Behavior<T>) : ActorContext<T> {
val children: MutableSet<Actor<*>> = mutableSetOf()
val interpreter = BehaviorInterpreter(initialBehavior)
+ val watchers: MutableSet<ActorPath> = Collections.newSetFromMap(WeakHashMap<ActorPath, Boolean>())
override val time: Instant
get() = this@OmegaActorSystem.time
@@ -167,10 +172,18 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
return false
}
val ref = registry[child.path] ?: return false
- ref.stop()
+ ref.stop(null)
return true
}
+ override fun watch(target: ActorRef<*>) {
+ registry[target.path]?.watchers?.add(path)
+ }
+
+ override fun unwatch(target: ActorRef<*>) {
+ registry[target.path]?.watchers?.remove(path)
+ }
+
// Synchronization of actors in a single-threaded simulation is trivial: all actors are consistent in virtual
// time.
override fun sync(target: ActorRef<*>) {}
@@ -189,18 +202,13 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
/**
* Stop this actor.
*/
- fun stop() {
+ fun stop(failure: Throwable?) {
interpreter.stop(this)
- terminate()
- interpreter.interpretSignal(this, PostStop)
- }
-
- /**
- * Terminate this actor and its children.
- */
- fun terminate() {
- children.forEach { it.stop() }
+ children.forEach { it.stop(failure) }
+ val termination = Terminated(self, failure)
+ watchers.forEach { schedule(it, termination, 0.0) }
registry.remove(self.path)
+ interpreter.interpretSignal(this, PostStop)
}
/**
@@ -215,8 +223,7 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
}
if (!interpreter.isAlive) {
- terminate()
- interpreter.interpretSignal(this, PostStop)
+ stop(null)
}
}
@@ -232,10 +239,10 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
private inline fun <T : Any, U> Actor<T>.isolate(block: (Actor<T>) -> U): U? {
return try {
block(this)
- } catch (e: Exception) {
+ } catch (t: Throwable) {
// Forcefully stop the actor if it crashed
- stop()
- log.error("Unhandled exception in actor $path", e)
+ stop(t)
+ log.error("Unhandled exception in actor $path", t)
null
}
}
@@ -277,12 +284,23 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
/**
* Schedule a message to be processed by the engine.
*
- * @property destination The destination of the message.
+ * @param destination The destination of the message.
* @param message The message to schedule.
* @param delay The time to wait before processing the message.
*/
private fun schedule(destination: ActorRef<*>, message: Any, delay: Duration) {
+ schedule(destination.path, message, delay)
+ }
+
+ /**
+ * Schedule a message to be processed by the engine.
+ *
+ * @param path The path to the destination of the message.
+ * @param message The message to schedule.
+ * @param delay The time to wait before processing the message.
+ */
+ private fun schedule(path: ActorPath, message: Any, delay: Duration) {
require(delay >= .0) { "The given delay must be a non-negative number" }
- queue.add(EnvelopeImpl(nextId++, destination.path, time + delay, message))
+ queue.add(EnvelopeImpl(nextId++, path, time + delay, message))
}
}