diff options
3 files changed, 118 insertions, 18 deletions
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt index 2fe97d59..251d0669 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt @@ -44,7 +44,7 @@ interface ActorRef<in T : Any> : Comparable<ActorRef<*>>, Serializable { /** * Unsafe helper method for widening the type accepted by this [ActorRef]. */ -fun <U : Any, T : U> ActorRef<T>.upcast(): ActorRef<U> { +fun <U : Any, T : U> ActorRef<T>.unsafeUpcast(): ActorRef<U> { @Suppress("UNCHECKED_CAST") return this as ActorRef<U> } diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt index 411915ef..8298ed28 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt @@ -43,18 +43,36 @@ sealed class Behavior<T : Any> { * This is almost always a safe operation, but might cause [ClassCastException] in case a narrowed behavior sends * messages of a different type to itself and is chained via [Behavior.orElse]. */ - fun <U : T> narrow(): Behavior<U> { - @Suppress("UNCHECKED_CAST") - return this as Behavior<U> + fun <U : T> narrow(): Behavior<U> = unsafeCast() + + /** + * Widen the type of this behavior by placing a funnel in front of it. + * + * @param transform The mapping from the widened type to the original type, returning `null` in-case the message + * should not be handled. + */ + fun <U : Any> widen(transform: (U) -> T?): Behavior<U> { + return wrap(this) { interpreter -> + receive<U> { ctx, msg -> + val res = transform(msg) + @Suppress("UNCHECKED_CAST") + if (res == null || interpreter.interpretMessage(ctx as ActorContext<T>, res)) + unhandled() + else + interpreter.behavior.unsafeCast() + }.unsafeCast() + }.unsafeCast() } /** * Compose this [Behavior] with a fallback [Behavior] which is used in case this [Behavior] does not handle the * incoming message or signal. + * + * @param that The fallback behavior. */ - fun orElse(behavior: Behavior<T>): Behavior<T> = + fun orElse(that: Behavior<T>): Behavior<T> = wrap(this) { left -> - wrap(behavior) { right -> + wrap(that) { right -> object : ReceivingBehavior<T>() { override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> { if (left.interpretMessage(ctx, msg)) { @@ -78,6 +96,16 @@ sealed class Behavior<T : Any> { } } } + + /** + * Unsafe utility method for changing the type accepted by this [Behavior]. + * Be aware that changing the type might result in [ClassCastException], when sending a message to the resulting + * behavior. + */ + fun <U : Any> unsafeCast(): Behavior<U> { + @Suppress("UNCHECKED_CAST") + return this as Behavior<U> + } } /** diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt index 4dea1007..f2736437 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt @@ -32,27 +32,18 @@ import com.atlarge.odcsim.internal.IgnoreBehavior * This [Behavior] is used to signal that this actor shall terminate voluntarily. If this actor has created child actors * then these will be stopped as part of the shutdown procedure. */ -fun <T : Any> stopped(): Behavior<T> { - @Suppress("UNCHECKED_CAST") - return StoppedBehavior as Behavior<T> -} +fun <T : Any> stopped(): Behavior<T> = StoppedBehavior.unsafeCast() /** * This [Behavior] is used to signal that this actor wants to reuse its previous behavior. */ -fun <T : Any> same(): Behavior<T> { - @Suppress("UNCHECKED_CAST") - return SameBehavior as Behavior<T> -} +fun <T : Any> same(): Behavior<T> = SameBehavior.unsafeCast() /** * This [Behavior] is used to signal to the system that the last message or signal went unhandled. This will * reuse the previous behavior. */ -fun <T : Any> unhandled(): Behavior<T> { - @Suppress("UNCHECKED_CAST") - return UnhandledBehavior as Behavior<T> -} +fun <T : Any> unhandled(): Behavior<T> = UnhandledBehavior.unsafeCast() /** * A factory for [Behavior]. Creation of the behavior instance is deferred until the actor is started. @@ -84,6 +75,21 @@ fun <T : Any> receive(handler: (ActorContext<T>, T) -> Behavior<T>): Behavior<T> } /** + * Construct a [Behavior] that reacts to incoming messages of type [U], provides access to the [ActorContext] and + * returns the actor's next behavior. Other messages will be unhandled. + */ +inline fun <T : Any, reified U : T> receiveOf(crossinline handler: (ActorContext<T>, U) -> Behavior<T>): Behavior<T> { + return object : ReceivingBehavior<T>() { + override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> { + return if (msg is U) + handler(ctx, msg) + else + unhandled() + } + } +} + +/** * Construct a [Behavior] that reacts to incoming messages and returns the actor's next behavior. */ fun <T : Any> receiveMessage(handler: (T) -> Behavior<T>): Behavior<T> { @@ -109,3 +115,69 @@ fun <T : Any> receiveSignal(handler: (ActorContext<T>, Signal) -> Behavior<T>): fun <T : Any> wrap(behavior: Behavior<T>, wrap: (BehaviorInterpreter<T>) -> Behavior<T>): Behavior<T> { return setup { ctx -> wrap(BehaviorInterpreter(behavior, ctx)) } } + +/** + * Join together both [Behavior] with another [Behavior], essentially running them side-by-side, only directly + * propagating stopped behavior. + * + * @param that The behavior to join with. + */ +fun <T : Any> Behavior<T>.join(that: Behavior<T>): Behavior<T> = + wrap(this) { left -> + wrap(that) { right -> + object : ReceivingBehavior<T>() { + override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> { + if (left.interpretMessage(ctx, msg)) { + return left.propagate(this) // Propagate stopped behavior + } else if (right.interpretMessage(ctx, msg)) { + return right.propagate(this) + } + + return unhandled() + } + + override fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> { + if (left.interpretSignal(ctx, signal)) { + return left.propagate(this) + } else if (right.interpretSignal(ctx, signal)) { + return right.propagate(this) + } + + return unhandled() + } + } + } + } + +/** + * Widen the type of messages the [Behavior] by marking all other messages as unhandled. + */ +inline fun <U : Any, reified T : U> Behavior<T>.widen(): Behavior<U> = widen { + if (it is T) + it + else + null +} + +/** + * Keep the specified [Behavior] alive if it returns the stopped behavior. + */ +fun <T : Any> Behavior<T>.keepAlive(): Behavior<T> = + wrap(this) { interpreter -> + object : ReceivingBehavior<T>() { + override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> { + if (interpreter.interpretMessage(ctx, msg)) { + return this + } + return empty() + } + + override fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> { + if (interpreter.interpretSignal(ctx, signal)) { + return this + } + + return empty() + } + } + } |
