summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt20
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt2
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Receive.kt8
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt27
4 files changed, 32 insertions, 25 deletions
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt
index ea68f34b..eac254ec 100644
--- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt
@@ -28,6 +28,7 @@ import com.atlarge.odcsim.internal.BehaviorInterpreter
import com.atlarge.odcsim.internal.EmptyBehavior
import com.atlarge.odcsim.internal.IgnoreBehavior
import com.atlarge.odcsim.internal.TimerSchedulerImpl
+import com.atlarge.odcsim.internal.sendSignal
/**
* This [Behavior] is used to signal that this actor shall terminate voluntarily. If this actor has created child actors
@@ -137,6 +138,25 @@ fun <T : Any> withTimers(handler: (TimerScheduler<T>) -> Behavior<T>): Behavior<
}
/**
+ * Construct a [Behavior] that waits for the specified duration before constructing the next behavior.
+ *
+ * @param after The delay before constructing the next behavior.
+ * @param handler The handler to construct the behavior with.
+ */
+fun <T : Any> withTimeout(after: Duration, handler: (ActorContext<T>) -> Behavior<T>): Behavior<T> =
+ setup { ctx ->
+ val target = Any()
+ ctx.sendSignal(ctx.self, Timeout(target), after)
+ receiveSignal { _, signal ->
+ if (signal is Timeout && signal.target == target) {
+ handler(ctx)
+ } else {
+ unhandled()
+ }
+ }
+ }
+
+/**
* Join together both [Behavior] with another [Behavior], essentially running them side-by-side, only directly
* propagating stopped behavior.
*
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt
index e393c6a1..2ad042e3 100644
--- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt
@@ -54,6 +54,6 @@ data class Timeout(val target: Any) : Signal
* A lifecycle signal to indicate that an actor that was watched has terminated.
*
* @property ref The reference to the actor that has terminated.
- * @property failure The failure that caused the termination, or `null` on graceful termination.
+ * @property failure The failure that caused the termination, or `null` on graceful termination.
*/
data class Terminated(val ref: ActorRef<*>, val failure: Throwable? = null) : Signal
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Receive.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Receive.kt
index 7c09734b..e995c0e3 100644
--- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Receive.kt
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Receive.kt
@@ -56,9 +56,11 @@ suspend inline fun <T : Any, reified U : T> SuspendingActorContext<T>.receiveOf(
* @param after The delay after which the message should be received by the actor.
* @param transform The block to transform `self` to a message.
*/
-suspend inline fun <T : Any, U : Any, reified V : T> SuspendingActorContext<T>.ask(ref: ActorRef<U>,
- after: Duration = 0.0,
- transform: (ActorRef<T>) -> U): V {
+suspend inline fun <T : Any, U : Any, reified V : T> SuspendingActorContext<T>.ask(
+ ref: ActorRef<U>,
+ after: Duration = 0.0,
+ transform: (ActorRef<T>) -> U
+): V {
send(ref, transform(self), after)
return receiveOf()
}
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt
index 694a107c..16b6f534 100644
--- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt
@@ -25,13 +25,8 @@
package com.atlarge.odcsim.coroutines.dsl
import com.atlarge.odcsim.Duration
-import com.atlarge.odcsim.Timeout
-import com.atlarge.odcsim.coroutines.SuspendingActorContext
import com.atlarge.odcsim.coroutines.suspendWithBehavior
-import com.atlarge.odcsim.internal.sendSignal
-import com.atlarge.odcsim.receiveSignal
-import com.atlarge.odcsim.setup
-import com.atlarge.odcsim.unhandled
+import com.atlarge.odcsim.withTimeout
import kotlin.coroutines.resume
/**
@@ -39,19 +34,9 @@ import kotlin.coroutines.resume
*
* @param after The duration after which execution should continue.
*/
-suspend fun <T : Any> SuspendingActorContext<T>.timeout(after: Duration) =
- suspendWithBehavior<T, Unit> { cont, next ->
- setup { ctx ->
- val target = this
- @Suppress("UNCHECKED_CAST")
- ctx.sendSignal(ctx.self, Timeout(target), after)
- receiveSignal { _, signal ->
- if (signal is Timeout && signal.target == target) {
- cont.resume(Unit)
- next()
- } else {
- unhandled()
- }
- }
- }
+suspend fun timeout(after: Duration) = suspendWithBehavior<Any, Unit> { cont, next ->
+ withTimeout(after) {
+ cont.resume(Unit)
+ next()
}
+}