summaryrefslogtreecommitdiff
path: root/odcsim-core
diff options
context:
space:
mode:
Diffstat (limited to 'odcsim-core')
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt2
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt38
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt96
3 files changed, 118 insertions, 18 deletions
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt
index 2fe97d59..251d0669 100644
--- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt
@@ -44,7 +44,7 @@ interface ActorRef<in T : Any> : Comparable<ActorRef<*>>, Serializable {
/**
* Unsafe helper method for widening the type accepted by this [ActorRef].
*/
-fun <U : Any, T : U> ActorRef<T>.upcast(): ActorRef<U> {
+fun <U : Any, T : U> ActorRef<T>.unsafeUpcast(): ActorRef<U> {
@Suppress("UNCHECKED_CAST")
return this as ActorRef<U>
}
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt
index 411915ef..8298ed28 100644
--- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt
@@ -43,18 +43,36 @@ sealed class Behavior<T : Any> {
* This is almost always a safe operation, but might cause [ClassCastException] in case a narrowed behavior sends
* messages of a different type to itself and is chained via [Behavior.orElse].
*/
- fun <U : T> narrow(): Behavior<U> {
- @Suppress("UNCHECKED_CAST")
- return this as Behavior<U>
+ fun <U : T> narrow(): Behavior<U> = unsafeCast()
+
+ /**
+ * Widen the type of this behavior by placing a funnel in front of it.
+ *
+ * @param transform The mapping from the widened type to the original type, returning `null` in-case the message
+ * should not be handled.
+ */
+ fun <U : Any> widen(transform: (U) -> T?): Behavior<U> {
+ return wrap(this) { interpreter ->
+ receive<U> { ctx, msg ->
+ val res = transform(msg)
+ @Suppress("UNCHECKED_CAST")
+ if (res == null || interpreter.interpretMessage(ctx as ActorContext<T>, res))
+ unhandled()
+ else
+ interpreter.behavior.unsafeCast()
+ }.unsafeCast()
+ }.unsafeCast()
}
/**
* Compose this [Behavior] with a fallback [Behavior] which is used in case this [Behavior] does not handle the
* incoming message or signal.
+ *
+ * @param that The fallback behavior.
*/
- fun orElse(behavior: Behavior<T>): Behavior<T> =
+ fun orElse(that: Behavior<T>): Behavior<T> =
wrap(this) { left ->
- wrap(behavior) { right ->
+ wrap(that) { right ->
object : ReceivingBehavior<T>() {
override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> {
if (left.interpretMessage(ctx, msg)) {
@@ -78,6 +96,16 @@ sealed class Behavior<T : Any> {
}
}
}
+
+ /**
+ * Unsafe utility method for changing the type accepted by this [Behavior].
+ * Be aware that changing the type might result in [ClassCastException], when sending a message to the resulting
+ * behavior.
+ */
+ fun <U : Any> unsafeCast(): Behavior<U> {
+ @Suppress("UNCHECKED_CAST")
+ return this as Behavior<U>
+ }
}
/**
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt
index 4dea1007..f2736437 100644
--- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt
@@ -32,27 +32,18 @@ import com.atlarge.odcsim.internal.IgnoreBehavior
* This [Behavior] is used to signal that this actor shall terminate voluntarily. If this actor has created child actors
* then these will be stopped as part of the shutdown procedure.
*/
-fun <T : Any> stopped(): Behavior<T> {
- @Suppress("UNCHECKED_CAST")
- return StoppedBehavior as Behavior<T>
-}
+fun <T : Any> stopped(): Behavior<T> = StoppedBehavior.unsafeCast()
/**
* This [Behavior] is used to signal that this actor wants to reuse its previous behavior.
*/
-fun <T : Any> same(): Behavior<T> {
- @Suppress("UNCHECKED_CAST")
- return SameBehavior as Behavior<T>
-}
+fun <T : Any> same(): Behavior<T> = SameBehavior.unsafeCast()
/**
* This [Behavior] is used to signal to the system that the last message or signal went unhandled. This will
* reuse the previous behavior.
*/
-fun <T : Any> unhandled(): Behavior<T> {
- @Suppress("UNCHECKED_CAST")
- return UnhandledBehavior as Behavior<T>
-}
+fun <T : Any> unhandled(): Behavior<T> = UnhandledBehavior.unsafeCast()
/**
* A factory for [Behavior]. Creation of the behavior instance is deferred until the actor is started.
@@ -84,6 +75,21 @@ fun <T : Any> receive(handler: (ActorContext<T>, T) -> Behavior<T>): Behavior<T>
}
/**
+ * Construct a [Behavior] that reacts to incoming messages of type [U], provides access to the [ActorContext] and
+ * returns the actor's next behavior. Other messages will be unhandled.
+ */
+inline fun <T : Any, reified U : T> receiveOf(crossinline handler: (ActorContext<T>, U) -> Behavior<T>): Behavior<T> {
+ return object : ReceivingBehavior<T>() {
+ override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> {
+ return if (msg is U)
+ handler(ctx, msg)
+ else
+ unhandled()
+ }
+ }
+}
+
+/**
* Construct a [Behavior] that reacts to incoming messages and returns the actor's next behavior.
*/
fun <T : Any> receiveMessage(handler: (T) -> Behavior<T>): Behavior<T> {
@@ -109,3 +115,69 @@ fun <T : Any> receiveSignal(handler: (ActorContext<T>, Signal) -> Behavior<T>):
fun <T : Any> wrap(behavior: Behavior<T>, wrap: (BehaviorInterpreter<T>) -> Behavior<T>): Behavior<T> {
return setup { ctx -> wrap(BehaviorInterpreter(behavior, ctx)) }
}
+
+/**
+ * Join together both [Behavior] with another [Behavior], essentially running them side-by-side, only directly
+ * propagating stopped behavior.
+ *
+ * @param that The behavior to join with.
+ */
+fun <T : Any> Behavior<T>.join(that: Behavior<T>): Behavior<T> =
+ wrap(this) { left ->
+ wrap(that) { right ->
+ object : ReceivingBehavior<T>() {
+ override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> {
+ if (left.interpretMessage(ctx, msg)) {
+ return left.propagate(this) // Propagate stopped behavior
+ } else if (right.interpretMessage(ctx, msg)) {
+ return right.propagate(this)
+ }
+
+ return unhandled()
+ }
+
+ override fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> {
+ if (left.interpretSignal(ctx, signal)) {
+ return left.propagate(this)
+ } else if (right.interpretSignal(ctx, signal)) {
+ return right.propagate(this)
+ }
+
+ return unhandled()
+ }
+ }
+ }
+ }
+
+/**
+ * Widen the type of messages the [Behavior] by marking all other messages as unhandled.
+ */
+inline fun <U : Any, reified T : U> Behavior<T>.widen(): Behavior<U> = widen {
+ if (it is T)
+ it
+ else
+ null
+}
+
+/**
+ * Keep the specified [Behavior] alive if it returns the stopped behavior.
+ */
+fun <T : Any> Behavior<T>.keepAlive(): Behavior<T> =
+ wrap(this) { interpreter ->
+ object : ReceivingBehavior<T>() {
+ override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> {
+ if (interpreter.interpretMessage(ctx, msg)) {
+ return this
+ }
+ return empty()
+ }
+
+ override fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> {
+ if (interpreter.interpretSignal(ctx, signal)) {
+ return this
+ }
+
+ return empty()
+ }
+ }
+ }