diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2019-05-06 18:21:13 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2019-05-14 12:55:56 +0200 |
| commit | 8a4972730965506cee3cbbdeb2b735454d4b6500 (patch) | |
| tree | 86c5373c07faef732a41401e7cea09d267b3c6d6 /odcsim-core/src/main | |
| parent | 5b48cdbad2493c7af9e79bb9996f195ace3123e5 (diff) | |
feat: Add support for watching actor termination
This change adds support for tracking termination of actors in the
system.
Diffstat (limited to 'odcsim-core/src/main')
5 files changed, 35 insertions, 4 deletions
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt index 3a704f32..8ea4a09d 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt @@ -84,6 +84,23 @@ interface ActorContext<T : Any> { fun stop(child: ActorRef<*>): Boolean /** + * Watch the specified [ActorRef] for termination of the referenced actor. On termination of the watched actor, + * a [Terminated] signal is sent to this actor. + * + * @param target The target actor to watch. + */ + fun watch(target: ActorRef<*>) + + /** + * Revoke the registration established by [watch]. + * + * In case there exists no registration for the specified [target], no action will be performed. + * + * @param target The target actor to unwatch. + */ + fun unwatch(target: ActorRef<*>) + + /** * Synchronize the local virtual time of this target with the other referenced actor's local virtual time. * * By default, actors are not guaranteed to be synchronized, meaning that for some implementations, virtual time may diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt index 8298ed28..9ad7f83f 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt @@ -167,7 +167,7 @@ val <T : Any> Behavior<T>.isAlive get() = this !is StoppedBehavior /** * A flag to indicate whether the last message/signal went unhandled. */ -val <T : Any> Behavior<T>.isUnhandled get() = this !is UnhandledBehavior +val <T : Any> Behavior<T>.isUnhandled get() = this is UnhandledBehavior // The special behaviors are kept in this file as to be able to seal the Behavior class to prevent users from extending // it. diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt index 35aad18b..e393c6a1 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt @@ -48,4 +48,12 @@ object PostStop : Signal * * @property target The target object that has timed out. */ -class Timeout(val target: Any) : Signal +data class Timeout(val target: Any) : Signal + +/** + * A lifecycle signal to indicate that an actor that was watched has terminated. + * + * @property ref The reference to the actor that has terminated. + * @property failure The failure that caused the termination, or `null` on graceful termination. + */ +data class Terminated(val ref: ActorRef<*>, val failure: Throwable? = null) : Signal diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt index 1d91f5b8..694a107c 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt @@ -28,6 +28,7 @@ import com.atlarge.odcsim.Duration import com.atlarge.odcsim.Timeout import com.atlarge.odcsim.coroutines.SuspendingActorContext import com.atlarge.odcsim.coroutines.suspendWithBehavior +import com.atlarge.odcsim.internal.sendSignal import com.atlarge.odcsim.receiveSignal import com.atlarge.odcsim.setup import com.atlarge.odcsim.unhandled @@ -43,7 +44,7 @@ suspend fun <T : Any> SuspendingActorContext<T>.timeout(after: Duration) = setup { ctx -> val target = this @Suppress("UNCHECKED_CAST") - ctx.send(ctx.self, Timeout(target) as T, after) + ctx.sendSignal(ctx.self, Timeout(target), after) receiveSignal { _, signal -> if (signal is Timeout && signal.target == target) { cont.resume(Unit) diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt index 8d533e9e..a9e20d0e 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt @@ -68,7 +68,8 @@ interface SuspendingActorContextImpl<T : Any> : SuspendingActorContext<T> { */ internal class SuspendingBehaviorImpl<T : Any>( private var actorContext: ActorContext<T>, - initialBehavior: SuspendingBehavior<T>) : ReceivingBehavior<T>(), SuspendingActorContextImpl<T> { + initialBehavior: SuspendingBehavior<T> +) : ReceivingBehavior<T>(), SuspendingActorContextImpl<T> { /** * The next behavior to use. @@ -106,6 +107,10 @@ internal class SuspendingBehaviorImpl<T : Any>( override fun stop(child: ActorRef<*>): Boolean = actorContext.stop(child) + override fun watch(target: ActorRef<*>) = actorContext.watch(target) + + override fun unwatch(target: ActorRef<*>) = actorContext.unwatch(target) + override fun sync(target: ActorRef<*>) = actorContext.sync(target) override fun unsync(target: ActorRef<*>) = actorContext.unsync(target) |
