diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2019-05-06 18:21:13 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2019-05-14 12:55:56 +0200 |
| commit | 8a4972730965506cee3cbbdeb2b735454d4b6500 (patch) | |
| tree | 86c5373c07faef732a41401e7cea09d267b3c6d6 /odcsim-engine-omega | |
| parent | 5b48cdbad2493c7af9e79bb9996f195ace3123e5 (diff) | |
feat: Add support for watching actor termination
This change adds support for tracking termination of actors in the
system.
Diffstat (limited to 'odcsim-engine-omega')
| -rw-r--r-- | odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt | 56 |
1 files changed, 37 insertions, 19 deletions
diff --git a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt index c1d487b7..8930fb96 100644 --- a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt +++ b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt @@ -35,10 +35,14 @@ import com.atlarge.odcsim.Instant import com.atlarge.odcsim.PostStop import com.atlarge.odcsim.PreStart import com.atlarge.odcsim.Signal +import com.atlarge.odcsim.Terminated import com.atlarge.odcsim.internal.BehaviorInterpreter import com.atlarge.odcsim.internal.logging.LoggerImpl +import com.sun.xml.internal.messaging.saaj.soap.impl.EnvelopeImpl import org.slf4j.Logger +import java.util.Collections import java.util.PriorityQueue +import java.util.WeakHashMap import kotlin.math.max /** @@ -119,7 +123,7 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) override fun send(msg: T, after: Duration) = schedule(this, msg, after) override fun terminate() { - registry[path]?.stop() + registry[path]?.stop(null) } override fun compareTo(other: ActorRef<*>): Int = path.compareTo(other.path) @@ -138,6 +142,7 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) private inner class Actor<T : Any>(override val self: ActorRef<T>, initialBehavior: Behavior<T>) : ActorContext<T> { val children: MutableSet<Actor<*>> = mutableSetOf() val interpreter = BehaviorInterpreter(initialBehavior) + val watchers: MutableSet<ActorPath> = Collections.newSetFromMap(WeakHashMap<ActorPath, Boolean>()) override val time: Instant get() = this@OmegaActorSystem.time @@ -167,10 +172,18 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) return false } val ref = registry[child.path] ?: return false - ref.stop() + ref.stop(null) return true } + override fun watch(target: ActorRef<*>) { + registry[target.path]?.watchers?.add(path) + } + + override fun unwatch(target: ActorRef<*>) { + registry[target.path]?.watchers?.remove(path) + } + // Synchronization of actors in a single-threaded simulation is trivial: all actors are consistent in virtual // time. override fun sync(target: ActorRef<*>) {} @@ -189,18 +202,13 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) /** * Stop this actor. */ - fun stop() { + fun stop(failure: Throwable?) { interpreter.stop(this) - terminate() - interpreter.interpretSignal(this, PostStop) - } - - /** - * Terminate this actor and its children. - */ - fun terminate() { - children.forEach { it.stop() } + children.forEach { it.stop(failure) } + val termination = Terminated(self, failure) + watchers.forEach { schedule(it, termination, 0.0) } registry.remove(self.path) + interpreter.interpretSignal(this, PostStop) } /** @@ -215,8 +223,7 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) } if (!interpreter.isAlive) { - terminate() - interpreter.interpretSignal(this, PostStop) + stop(null) } } @@ -232,10 +239,10 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) private inline fun <T : Any, U> Actor<T>.isolate(block: (Actor<T>) -> U): U? { return try { block(this) - } catch (e: Exception) { + } catch (t: Throwable) { // Forcefully stop the actor if it crashed - stop() - log.error("Unhandled exception in actor $path", e) + stop(t) + log.error("Unhandled exception in actor $path", t) null } } @@ -277,12 +284,23 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) /** * Schedule a message to be processed by the engine. * - * @property destination The destination of the message. + * @param destination The destination of the message. * @param message The message to schedule. * @param delay The time to wait before processing the message. */ private fun schedule(destination: ActorRef<*>, message: Any, delay: Duration) { + schedule(destination.path, message, delay) + } + + /** + * Schedule a message to be processed by the engine. + * + * @param path The path to the destination of the message. + * @param message The message to schedule. + * @param delay The time to wait before processing the message. + */ + private fun schedule(path: ActorPath, message: Any, delay: Duration) { require(delay >= .0) { "The given delay must be a non-negative number" } - queue.add(EnvelopeImpl(nextId++, destination.path, time + delay, message)) + queue.add(EnvelopeImpl(nextId++, path, time + delay, message)) } } |
