diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2019-05-06 18:21:13 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2019-05-14 12:55:56 +0200 |
| commit | 8a4972730965506cee3cbbdeb2b735454d4b6500 (patch) | |
| tree | 86c5373c07faef732a41401e7cea09d267b3c6d6 | |
| parent | 5b48cdbad2493c7af9e79bb9996f195ace3123e5 (diff) | |
feat: Add support for watching actor termination
This change adds support for tracking termination of actors in the
system.
8 files changed, 111 insertions, 24 deletions
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt index 3a704f32..8ea4a09d 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt @@ -84,6 +84,23 @@ interface ActorContext<T : Any> { fun stop(child: ActorRef<*>): Boolean /** + * Watch the specified [ActorRef] for termination of the referenced actor. On termination of the watched actor, + * a [Terminated] signal is sent to this actor. + * + * @param target The target actor to watch. + */ + fun watch(target: ActorRef<*>) + + /** + * Revoke the registration established by [watch]. + * + * In case there exists no registration for the specified [target], no action will be performed. + * + * @param target The target actor to unwatch. + */ + fun unwatch(target: ActorRef<*>) + + /** * Synchronize the local virtual time of this target with the other referenced actor's local virtual time. * * By default, actors are not guaranteed to be synchronized, meaning that for some implementations, virtual time may diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt index 8298ed28..9ad7f83f 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt @@ -167,7 +167,7 @@ val <T : Any> Behavior<T>.isAlive get() = this !is StoppedBehavior /** * A flag to indicate whether the last message/signal went unhandled. */ -val <T : Any> Behavior<T>.isUnhandled get() = this !is UnhandledBehavior +val <T : Any> Behavior<T>.isUnhandled get() = this is UnhandledBehavior // The special behaviors are kept in this file as to be able to seal the Behavior class to prevent users from extending // it. diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt index 35aad18b..e393c6a1 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt @@ -48,4 +48,12 @@ object PostStop : Signal * * @property target The target object that has timed out. */ -class Timeout(val target: Any) : Signal +data class Timeout(val target: Any) : Signal + +/** + * A lifecycle signal to indicate that an actor that was watched has terminated. + * + * @property ref The reference to the actor that has terminated. + * @property failure The failure that caused the termination, or `null` on graceful termination. + */ +data class Terminated(val ref: ActorRef<*>, val failure: Throwable? = null) : Signal diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt index 1d91f5b8..694a107c 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt @@ -28,6 +28,7 @@ import com.atlarge.odcsim.Duration import com.atlarge.odcsim.Timeout import com.atlarge.odcsim.coroutines.SuspendingActorContext import com.atlarge.odcsim.coroutines.suspendWithBehavior +import com.atlarge.odcsim.internal.sendSignal import com.atlarge.odcsim.receiveSignal import com.atlarge.odcsim.setup import com.atlarge.odcsim.unhandled @@ -43,7 +44,7 @@ suspend fun <T : Any> SuspendingActorContext<T>.timeout(after: Duration) = setup { ctx -> val target = this @Suppress("UNCHECKED_CAST") - ctx.send(ctx.self, Timeout(target) as T, after) + ctx.sendSignal(ctx.self, Timeout(target), after) receiveSignal { _, signal -> if (signal is Timeout && signal.target == target) { cont.resume(Unit) diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt index 8d533e9e..a9e20d0e 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt @@ -68,7 +68,8 @@ interface SuspendingActorContextImpl<T : Any> : SuspendingActorContext<T> { */ internal class SuspendingBehaviorImpl<T : Any>( private var actorContext: ActorContext<T>, - initialBehavior: SuspendingBehavior<T>) : ReceivingBehavior<T>(), SuspendingActorContextImpl<T> { + initialBehavior: SuspendingBehavior<T> +) : ReceivingBehavior<T>(), SuspendingActorContextImpl<T> { /** * The next behavior to use. @@ -106,6 +107,10 @@ internal class SuspendingBehaviorImpl<T : Any>( override fun stop(child: ActorRef<*>): Boolean = actorContext.stop(child) + override fun watch(target: ActorRef<*>) = actorContext.watch(target) + + override fun unwatch(target: ActorRef<*>) = actorContext.unwatch(target) + override fun sync(target: ActorRef<*>) = actorContext.sync(target) override fun unsync(target: ActorRef<*>) = actorContext.unsync(target) diff --git a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt index c1d487b7..8930fb96 100644 --- a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt +++ b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt @@ -35,10 +35,14 @@ import com.atlarge.odcsim.Instant import com.atlarge.odcsim.PostStop import com.atlarge.odcsim.PreStart import com.atlarge.odcsim.Signal +import com.atlarge.odcsim.Terminated import com.atlarge.odcsim.internal.BehaviorInterpreter import com.atlarge.odcsim.internal.logging.LoggerImpl +import com.sun.xml.internal.messaging.saaj.soap.impl.EnvelopeImpl import org.slf4j.Logger +import java.util.Collections import java.util.PriorityQueue +import java.util.WeakHashMap import kotlin.math.max /** @@ -119,7 +123,7 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) override fun send(msg: T, after: Duration) = schedule(this, msg, after) override fun terminate() { - registry[path]?.stop() + registry[path]?.stop(null) } override fun compareTo(other: ActorRef<*>): Int = path.compareTo(other.path) @@ -138,6 +142,7 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) private inner class Actor<T : Any>(override val self: ActorRef<T>, initialBehavior: Behavior<T>) : ActorContext<T> { val children: MutableSet<Actor<*>> = mutableSetOf() val interpreter = BehaviorInterpreter(initialBehavior) + val watchers: MutableSet<ActorPath> = Collections.newSetFromMap(WeakHashMap<ActorPath, Boolean>()) override val time: Instant get() = this@OmegaActorSystem.time @@ -167,10 +172,18 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) return false } val ref = registry[child.path] ?: return false - ref.stop() + ref.stop(null) return true } + override fun watch(target: ActorRef<*>) { + registry[target.path]?.watchers?.add(path) + } + + override fun unwatch(target: ActorRef<*>) { + registry[target.path]?.watchers?.remove(path) + } + // Synchronization of actors in a single-threaded simulation is trivial: all actors are consistent in virtual // time. override fun sync(target: ActorRef<*>) {} @@ -189,18 +202,13 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) /** * Stop this actor. */ - fun stop() { + fun stop(failure: Throwable?) { interpreter.stop(this) - terminate() - interpreter.interpretSignal(this, PostStop) - } - - /** - * Terminate this actor and its children. - */ - fun terminate() { - children.forEach { it.stop() } + children.forEach { it.stop(failure) } + val termination = Terminated(self, failure) + watchers.forEach { schedule(it, termination, 0.0) } registry.remove(self.path) + interpreter.interpretSignal(this, PostStop) } /** @@ -215,8 +223,7 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) } if (!interpreter.isAlive) { - terminate() - interpreter.interpretSignal(this, PostStop) + stop(null) } } @@ -232,10 +239,10 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) private inline fun <T : Any, U> Actor<T>.isolate(block: (Actor<T>) -> U): U? { return try { block(this) - } catch (e: Exception) { + } catch (t: Throwable) { // Forcefully stop the actor if it crashed - stop() - log.error("Unhandled exception in actor $path", e) + stop(t) + log.error("Unhandled exception in actor $path", t) null } } @@ -277,12 +284,23 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) /** * Schedule a message to be processed by the engine. * - * @property destination The destination of the message. + * @param destination The destination of the message. * @param message The message to schedule. * @param delay The time to wait before processing the message. */ private fun schedule(destination: ActorRef<*>, message: Any, delay: Duration) { + schedule(destination.path, message, delay) + } + + /** + * Schedule a message to be processed by the engine. + * + * @param path The path to the destination of the message. + * @param message The message to schedule. + * @param delay The time to wait before processing the message. + */ + private fun schedule(path: ActorPath, message: Any, delay: Duration) { require(delay >= .0) { "The given delay must be a non-negative number" } - queue.add(EnvelopeImpl(nextId++, destination.path, time + delay, message)) + queue.add(EnvelopeImpl(nextId++, path, time + delay, message)) } } diff --git a/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemContract.kt b/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemContract.kt index cb75c2ac..867e7c11 100644 --- a/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemContract.kt +++ b/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemContract.kt @@ -28,9 +28,13 @@ import com.atlarge.odcsim.ActorPath import com.atlarge.odcsim.ActorRef import com.atlarge.odcsim.ActorSystemFactory import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.Terminated +import com.atlarge.odcsim.coroutines.dsl.timeout +import com.atlarge.odcsim.coroutines.suspending import com.atlarge.odcsim.empty import com.atlarge.odcsim.ignore import com.atlarge.odcsim.receiveMessage +import com.atlarge.odcsim.receiveSignal import com.atlarge.odcsim.same import com.atlarge.odcsim.setup import com.atlarge.odcsim.stopped @@ -278,7 +282,7 @@ abstract class ActorSystemContract { val system = factory(setup<ActorRef<Unit>> { ctx -> val root = ctx.self val child = ctx.spawn(setup<Unit> { - val child = ctx.spawn(receiveMessage<Unit> { + val child = it.spawn(receiveMessage<Unit> { throw IllegalStateException("DELIBERATE") }, "child") ctx.send(root, child) @@ -363,6 +367,36 @@ abstract class ActorSystemContract { assertEquals(1, counter) system.terminate() } + + /** + * Test whether an actor can watch for termination. + */ + @Test + fun `should watch for termination`() { + var received = false + val system = factory(setup<Nothing> { ctx -> + val child = ctx.spawn(suspending<Nothing> { + it.timeout(50.0) + stopped() + }, "child") + ctx.watch(child) + + receiveSignal { _, signal -> + when (signal) { + is Terminated -> { + received = true + stopped() + } + else -> + same() + } + } + }, "test") + + system.run() + system.terminate() + assertTrue(received) + } } companion object { diff --git a/odcsim-testkit/src/main/kotlin/com/atlarge/odcsim/testkit/internal/ActorContextStub.kt b/odcsim-testkit/src/main/kotlin/com/atlarge/odcsim/testkit/internal/ActorContextStub.kt index d11d47a3..047e4c70 100644 --- a/odcsim-testkit/src/main/kotlin/com/atlarge/odcsim/testkit/internal/ActorContextStub.kt +++ b/odcsim-testkit/src/main/kotlin/com/atlarge/odcsim/testkit/internal/ActorContextStub.kt @@ -81,6 +81,10 @@ internal class ActorContextStub<T : Any>(private val owner: BehaviorTestKitImpl< return true } + override fun watch(target: ActorRef<*>) {} + + override fun unwatch(target: ActorRef<*>) {} + override fun sync(target: ActorRef<*>) {} override fun unsync(target: ActorRef<*>) {} |
