From 8a4972730965506cee3cbbdeb2b735454d4b6500 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Mon, 6 May 2019 18:21:13 +0200 Subject: feat: Add support for watching actor termination This change adds support for tracking termination of actors in the system. --- .../src/main/kotlin/com/atlarge/odcsim/ActorContext.kt | 17 +++++++++++++++++ .../src/main/kotlin/com/atlarge/odcsim/Behavior.kt | 2 +- .../src/main/kotlin/com/atlarge/odcsim/Signals.kt | 10 +++++++++- .../kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt | 3 ++- .../kotlin/com/atlarge/odcsim/internal/Coroutines.kt | 7 ++++++- 5 files changed, 35 insertions(+), 4 deletions(-) (limited to 'odcsim-core/src/main') diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt index 3a704f32..8ea4a09d 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt @@ -83,6 +83,23 @@ interface ActorContext { */ fun stop(child: ActorRef<*>): Boolean + /** + * Watch the specified [ActorRef] for termination of the referenced actor. On termination of the watched actor, + * a [Terminated] signal is sent to this actor. + * + * @param target The target actor to watch. + */ + fun watch(target: ActorRef<*>) + + /** + * Revoke the registration established by [watch]. + * + * In case there exists no registration for the specified [target], no action will be performed. + * + * @param target The target actor to unwatch. + */ + fun unwatch(target: ActorRef<*>) + /** * Synchronize the local virtual time of this target with the other referenced actor's local virtual time. * diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt index 8298ed28..9ad7f83f 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt @@ -167,7 +167,7 @@ val Behavior.isAlive get() = this !is StoppedBehavior /** * A flag to indicate whether the last message/signal went unhandled. */ -val Behavior.isUnhandled get() = this !is UnhandledBehavior +val Behavior.isUnhandled get() = this is UnhandledBehavior // The special behaviors are kept in this file as to be able to seal the Behavior class to prevent users from extending // it. diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt index 35aad18b..e393c6a1 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt @@ -48,4 +48,12 @@ object PostStop : Signal * * @property target The target object that has timed out. */ -class Timeout(val target: Any) : Signal +data class Timeout(val target: Any) : Signal + +/** + * A lifecycle signal to indicate that an actor that was watched has terminated. + * + * @property ref The reference to the actor that has terminated. + * @property failure The failure that caused the termination, or `null` on graceful termination. + */ +data class Terminated(val ref: ActorRef<*>, val failure: Throwable? = null) : Signal diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt index 1d91f5b8..694a107c 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt @@ -28,6 +28,7 @@ import com.atlarge.odcsim.Duration import com.atlarge.odcsim.Timeout import com.atlarge.odcsim.coroutines.SuspendingActorContext import com.atlarge.odcsim.coroutines.suspendWithBehavior +import com.atlarge.odcsim.internal.sendSignal import com.atlarge.odcsim.receiveSignal import com.atlarge.odcsim.setup import com.atlarge.odcsim.unhandled @@ -43,7 +44,7 @@ suspend fun SuspendingActorContext.timeout(after: Duration) = setup { ctx -> val target = this @Suppress("UNCHECKED_CAST") - ctx.send(ctx.self, Timeout(target) as T, after) + ctx.sendSignal(ctx.self, Timeout(target), after) receiveSignal { _, signal -> if (signal is Timeout && signal.target == target) { cont.resume(Unit) diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt index 8d533e9e..a9e20d0e 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt @@ -68,7 +68,8 @@ interface SuspendingActorContextImpl : SuspendingActorContext { */ internal class SuspendingBehaviorImpl( private var actorContext: ActorContext, - initialBehavior: SuspendingBehavior) : ReceivingBehavior(), SuspendingActorContextImpl { + initialBehavior: SuspendingBehavior +) : ReceivingBehavior(), SuspendingActorContextImpl { /** * The next behavior to use. @@ -106,6 +107,10 @@ internal class SuspendingBehaviorImpl( override fun stop(child: ActorRef<*>): Boolean = actorContext.stop(child) + override fun watch(target: ActorRef<*>) = actorContext.watch(target) + + override fun unwatch(target: ActorRef<*>) = actorContext.unwatch(target) + override fun sync(target: ActorRef<*>) = actorContext.sync(target) override fun unsync(target: ActorRef<*>) = actorContext.unsync(target) -- cgit v1.2.3