diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2019-04-24 15:24:34 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2019-05-13 20:26:46 +0200 |
| commit | 7e87f2964595ee3cfab84db84c3c7efb66a89d15 (patch) | |
| tree | 5b1b5cd7afe321784c34e0032b2f0d9f40349f48 /odcsim-core/src/main | |
| parent | f0a8f3906d6f4d94900117b4d9f0bd9e58f33e10 (diff) | |
refactor: Require ActorContext for sending messages
This change makes it mandatory to send messages to an actor via the
sender's ActorContext in order for the engine to maintain consistency
in time between actors.
Diffstat (limited to 'odcsim-core/src/main')
| -rw-r--r-- | odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt | 15 | ||||
| -rw-r--r-- | odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt | 19 | ||||
| -rw-r--r-- | odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt | 8 | ||||
| -rw-r--r-- | odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt | 101 | ||||
| -rw-r--r-- | odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt (renamed from odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt) | 55 | ||||
| -rw-r--r-- | odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/Behavior.kt | 36 | ||||
| -rw-r--r-- | odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Receive.kt | 64 | ||||
| -rw-r--r-- | odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt | 22 | ||||
| -rw-r--r-- | odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorContext.kt (renamed from odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorRef.kt) | 14 | ||||
| -rw-r--r-- | odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt | 46 |
10 files changed, 232 insertions, 148 deletions
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt index deccedd1..d5edd93f 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt @@ -41,6 +41,20 @@ interface ActorContext<T : Any> { val time: Instant /** + * Send the specified message to the actor referenced by this [ActorRef]. + * + * Please note that callees must guarantee that messages are sent strictly in increasing time. + * If so, this method guarantees that: + * - A message will never be received earlier than specified + * - A message might arrive later than specified if the two actors are not synchronized. + * + * @param ref The actor to send the message to. + * @param msg The message to send to the referenced actor. + * @param after The delay after which the message should be received by the actor. + */ + fun <U : Any> send(ref: ActorRef<U>, msg: U, after: Duration = 0.0) + + /** * Spawn a child actor from the given [Behavior] and with the specified name. * * @param behavior The behavior of the child actor to spawn. @@ -94,3 +108,4 @@ interface ActorContext<T : Any> { */ fun isSync(target: ActorRef<*>): Boolean } + diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt index bc81813b..71b10325 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt @@ -32,23 +32,12 @@ interface ActorRef<in T : Any> { * The path for this actor (from this actor up to the root actor). */ val path: ActorPath - - /** - * Send the specified message to the actor referenced by this [ActorRef]. - * - * Please note that callees must guarantee that messages are sent strictly in increasing time. - * If so, this method guarantees that: - * - A message will never be received earlier than specified - * - A message might arrive later than specified if the two actors are not synchronized. - * - * @param msg The message to send to the referenced actor. - * @param after The delay after which the message should be received by the actor. - */ - fun send(msg: T, after: Duration = 0.1) } /** * Unsafe helper method for widening the type accepted by this [ActorRef]. */ -@Suppress("UNCHECKED_CAST") -fun <U : Any, T : U> ActorRef<T>.upcast(): ActorRef<U> = this as ActorRef<U> +fun <U : Any, T : U> ActorRef<T>.upcast(): ActorRef<U> { + @Suppress("UNCHECKED_CAST") + return this as ActorRef<U> +} diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt index a67f13d9..cbb80541 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt @@ -49,4 +49,12 @@ interface ActorSystem<in T : Any> : ActorRef<T> { * @param until The point until which the simulation should run. */ fun run(until: Duration = Duration.POSITIVE_INFINITY) + + /** + * Send the specified message to the root actor of this [ActorSystem]. + * + * @param msg The message to send to the referenced actor. + * @param after The delay after which the message should be received by the actor. + */ + fun send(msg: T, after: Duration = 0.1) } diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt index 1bc41b44..411915ef 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt @@ -24,8 +24,6 @@ package com.atlarge.odcsim -import com.atlarge.odcsim.dsl.wrap - /** * The representation of the behavior of an actor. * @@ -51,35 +49,35 @@ sealed class Behavior<T : Any> { } /** - * Companion object used to bind factory methods to. + * Compose this [Behavior] with a fallback [Behavior] which is used in case this [Behavior] does not handle the + * incoming message or signal. */ - companion object { - /** - * This [Behavior] is used to signal that this actor shall terminate voluntarily. If this actor has created child actors - * then these will be stopped as part of the shutdown procedure. - */ - fun <T : Any> stopped(): Behavior<T> { - @Suppress("UNCHECKED_CAST") - return StoppedBehavior as Behavior<T> - } + fun orElse(behavior: Behavior<T>): Behavior<T> = + wrap(this) { left -> + wrap(behavior) { right -> + object : ReceivingBehavior<T>() { + override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> { + if (left.interpretMessage(ctx, msg)) { + return left.behavior + } else if (right.interpretMessage(ctx, msg)) { + return right.behavior + } + + return unhandled() + } - /** - * This [Behavior] is used to signal that this actor wants to reuse its previous behavior. - */ - fun <T : Any> same(): Behavior<T> { - @Suppress("UNCHECKED_CAST") - return SameBehavior as Behavior<T> - } + override fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> { + if (left.interpretSignal(ctx, signal)) { + return left.behavior + } else if (right.interpretSignal(ctx, signal)) { + return right.behavior + } - /** - * This [Behavior] is used to signal to the system that the last message or signal went unhandled. This will - * reuse the previous behavior. - */ - fun <T : Any> unhandled(): Behavior<T> { - @Suppress("UNCHECKED_CAST") - return UnhandledBehavior as Behavior<T> + return unhandled() + } + } + } } - } } /** @@ -103,68 +101,37 @@ abstract class DeferredBehavior<T : Any> : Behavior<T>() { * The message may either be of the type that the actor declares and which is part of the [ActorRef] signature, * or it may be a system [Signal] that expresses a lifecycle event of either this actor or one of its child actors. * - * @param T The shape of the messages the behavior accepts.* + * @param T The shape of the messages the behavior accepts. */ abstract class ReceivingBehavior<T : Any> : Behavior<T>() { /** * Process an incoming message of type [T] and return the actor's next behavior. * * The returned behavior can in addition to normal behaviors be one of the canned special objects: - * - returning [Behavior.Companion.stopped] will terminate this Behavior - * - returning [Behavior.Companion.same] designates to reuse the current Behavior - * - returning [Behavior.Companion.unhandled] keeps the same Behavior and signals that the message was not yet handled + * - returning [stopped] will terminate this Behavior + * - returning [same] designates to reuse the current Behavior + * - returning [unhandled] keeps the same Behavior and signals that the message was not yet handled * * @param ctx The [ActorContext] in which the actor is currently running. * @param msg The message that was received. */ - open fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> = Behavior.unhandled() + open fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> = unhandled() /** * Process an incoming [Signal] and return the actor's next behavior. * * The returned behavior can in addition to normal behaviors be one of the canned special objects: - * - returning [Behavior.Companion.stopped] will terminate this Behavior - * - returning [Behavior.Companion.same] designates to reuse the current Behavior - * - returning [Behavior.Companion.unhandled] keeps the same Behavior and signals that the message was not yet handled + * - returning [stopped] will terminate this Behavior + * - returning [same] designates to reuse the current Behavior + * - returning [unhandled] keeps the same Behavior and signals that the message was not yet handled * * @param ctx The [ActorContext] in which the actor is currently running. * @param signal The [Signal] that was received. */ - open fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> = Behavior.unhandled() + open fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> = unhandled() } /** - * Compose this [Behavior] with a fallback [Behavior] which is used in case this [Behavior] does not handle the incoming - * message or signal. - */ -fun <T : Any> Behavior<T>.orElse(behavior: Behavior<T>): Behavior<T> = - Behavior.wrap(this) { left -> - Behavior.wrap(behavior) { right -> - object : ReceivingBehavior<T>() { - override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> { - if (left.interpretMessage(ctx, msg)) { - return left.behavior - } else if (right.interpretMessage(ctx, msg)) { - return right.behavior - } - - return Behavior.unhandled() - } - - override fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> { - if (left.interpretSignal(ctx, signal)) { - return left.behavior - } else if (right.interpretSignal(ctx, signal)) { - return right.behavior - } - - return Behavior.unhandled() - } - } - } - } - -/** * A flag to indicate whether a [Behavior] instance is still alive. */ val <T : Any> Behavior<T>.isAlive get() = this !is StoppedBehavior diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt index d4b092d2..4dea1007 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2018 atlarge-research + * Copyright (c) 2019 atlarge-research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -21,22 +21,43 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ +@file:JvmName("Behaviors") +package com.atlarge.odcsim -package com.atlarge.odcsim.dsl - -import com.atlarge.odcsim.ActorContext -import com.atlarge.odcsim.Behavior -import com.atlarge.odcsim.DeferredBehavior -import com.atlarge.odcsim.ReceivingBehavior -import com.atlarge.odcsim.Signal import com.atlarge.odcsim.internal.BehaviorInterpreter import com.atlarge.odcsim.internal.EmptyBehavior import com.atlarge.odcsim.internal.IgnoreBehavior /** + * This [Behavior] is used to signal that this actor shall terminate voluntarily. If this actor has created child actors + * then these will be stopped as part of the shutdown procedure. + */ +fun <T : Any> stopped(): Behavior<T> { + @Suppress("UNCHECKED_CAST") + return StoppedBehavior as Behavior<T> +} + +/** + * This [Behavior] is used to signal that this actor wants to reuse its previous behavior. + */ +fun <T : Any> same(): Behavior<T> { + @Suppress("UNCHECKED_CAST") + return SameBehavior as Behavior<T> +} + +/** + * This [Behavior] is used to signal to the system that the last message or signal went unhandled. This will + * reuse the previous behavior. + */ +fun <T : Any> unhandled(): Behavior<T> { + @Suppress("UNCHECKED_CAST") + return UnhandledBehavior as Behavior<T> +} + +/** * A factory for [Behavior]. Creation of the behavior instance is deferred until the actor is started. */ -fun <T : Any> Behavior.Companion.setup(block: (ActorContext<T>) -> Behavior<T>): Behavior<T> { +fun <T : Any> setup(block: (ActorContext<T>) -> Behavior<T>): Behavior<T> { return object : DeferredBehavior<T>() { override fun invoke(ctx: ActorContext<T>): Behavior<T> = block(ctx) } @@ -45,18 +66,18 @@ fun <T : Any> Behavior.Companion.setup(block: (ActorContext<T>) -> Behavior<T>): /** * A [Behavior] that ignores any incoming message or signal and keeps the same behavior. */ -fun <T : Any> Behavior.Companion.ignore(): Behavior<T> = IgnoreBehavior.narrow() +fun <T : Any> ignore(): Behavior<T> = IgnoreBehavior.narrow() /** * A [Behavior] that treats every incoming message or signal as unhandled. */ -fun <T : Any> Behavior.Companion.empty(): Behavior<T> = EmptyBehavior.narrow() +fun <T : Any> empty(): Behavior<T> = EmptyBehavior.narrow() /** * Construct a [Behavior] that reacts to incoming messages, provides access to the [ActorContext] and returns the * actor's next behavior. */ -fun <T : Any> Behavior.Companion.receive(handler: (ActorContext<T>, T) -> Behavior<T>): Behavior<T> { +fun <T : Any> receive(handler: (ActorContext<T>, T) -> Behavior<T>): Behavior<T> { return object : ReceivingBehavior<T>() { override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> = handler(ctx, msg) } @@ -65,9 +86,9 @@ fun <T : Any> Behavior.Companion.receive(handler: (ActorContext<T>, T) -> Behavi /** * Construct a [Behavior] that reacts to incoming messages and returns the actor's next behavior. */ -fun <T : Any> Behavior.Companion.receiveMessage(onMessage: (T) -> Behavior<T>): Behavior<T> { +fun <T : Any> receiveMessage(handler: (T) -> Behavior<T>): Behavior<T> { return object : ReceivingBehavior<T>() { - override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> = onMessage(msg) + override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> = handler(msg) } } @@ -75,7 +96,7 @@ fun <T : Any> Behavior.Companion.receiveMessage(onMessage: (T) -> Behavior<T>): * Construct a [Behavior] that reacts to incoming signals, provides access to the [ActorContext] and returns the * actor's next behavior. */ -fun <T : Any> Behavior.Companion.receiveSignal(handler: (ActorContext<T>, Signal) -> Behavior<T>): Behavior<T> { +fun <T : Any> receiveSignal(handler: (ActorContext<T>, Signal) -> Behavior<T>): Behavior<T> { return object : ReceivingBehavior<T>() { override fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> = handler(ctx, signal) } @@ -85,6 +106,6 @@ fun <T : Any> Behavior.Companion.receiveSignal(handler: (ActorContext<T>, Signal * Construct a [Behavior] that wraps another behavior instance and uses a [BehaviorInterpreter] to pass incoming * messages and signals to the wrapper behavior. */ -fun <T : Any> Behavior.Companion.wrap(behavior: Behavior<T>, wrap: (BehaviorInterpreter<T>) -> Behavior<T>): Behavior<T> { - return Behavior.setup { ctx -> wrap(BehaviorInterpreter(behavior, ctx)) } +fun <T : Any> wrap(behavior: Behavior<T>, wrap: (BehaviorInterpreter<T>) -> Behavior<T>): Behavior<T> { + return setup { ctx -> wrap(BehaviorInterpreter(behavior, ctx)) } } diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/Behavior.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/Behavior.kt index 20aab232..b963cdb6 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/Behavior.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/Behavior.kt @@ -33,6 +33,7 @@ import com.atlarge.odcsim.internal.SuspendingBehaviorImpl import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext import kotlin.coroutines.suspendCoroutine +import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn /** * A [Behavior] that allows method calls to suspend execution via Kotlin coroutines. @@ -41,23 +42,15 @@ import kotlin.coroutines.suspendCoroutine */ abstract class SuspendingBehavior<T : Any> : DeferredBehavior<T>() { /** - * Run the logic of this behavior. + * Run the suspending logic of this behavior. * * @param ctx The [SuspendingActorContext] in which the behavior is executed. + * @return The next behavior for the actor. */ - abstract suspend operator fun invoke(ctx: SuspendingActorContext<T>) + abstract suspend operator fun invoke(ctx: SuspendingActorContext<T>): Behavior<T> // Immediately transfer to implementation - override fun invoke(ctx: ActorContext<T>): Behavior<T> = SuspendingBehaviorImpl(ctx, this) -} - -/** - * Construct a [Behavior] that uses Kotlin coroutines functionality to handle incoming messages and signals. - */ -fun <T : Any> Behavior.Companion.suspending(block: suspend (SuspendingActorContext<T>) -> Unit): Behavior<T> { - return object : SuspendingBehavior<T>() { - override suspend fun invoke(ctx: SuspendingActorContext<T>) = block(ctx) - } + override fun invoke(ctx: ActorContext<T>): Behavior<T> = SuspendingBehaviorImpl(ctx, this).start() } /** @@ -102,3 +95,22 @@ suspend fun <T : Any, U> suspendWithBehavior(block: (Continuation<U>, () -> Beha ?: throw UnsupportedOperationException("Coroutine does not run inside SuspendingBehavior") ctx.become(block(cont) { ctx.behavior }) } + +/** + * Obtain the current [SuspendingActorContext] instance for the active continuation. + */ +suspend fun <T : Any> actorContext(): SuspendingActorContext<T> = + suspendCoroutineUninterceptedOrReturn { cont -> + @Suppress("UNCHECKED_CAST") + cont.context[SuspendingActorContext] as? SuspendingActorContext<T> + ?: throw UnsupportedOperationException("Coroutine does not run inside SuspendingBehavior") + } + +/** + * Construct a [Behavior] that uses Kotlin coroutines functionality to handle incoming messages and signals. + */ +fun <T : Any> suspending(block: suspend (SuspendingActorContext<T>) -> Behavior<T>): Behavior<T> { + return object : SuspendingBehavior<T>() { + override suspend fun invoke(ctx: SuspendingActorContext<T>) = block(ctx) + } +} diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Receive.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Receive.kt new file mode 100644 index 00000000..7c09734b --- /dev/null +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Receive.kt @@ -0,0 +1,64 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim.coroutines.dsl + +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Duration +import com.atlarge.odcsim.coroutines.SuspendingActorContext +import com.atlarge.odcsim.coroutines.suspendWithBehavior +import com.atlarge.odcsim.receiveMessage +import com.atlarge.odcsim.unhandled +import kotlin.coroutines.resume + +/** + * Receive only messages of type [U] and mark all other messages as unhandled. + * + * @return The received message. + */ +suspend inline fun <T : Any, reified U : T> SuspendingActorContext<T>.receiveOf(): U = + suspendWithBehavior<T, U> { cont, next -> + receiveMessage { msg -> + if (msg is U) { + cont.resume(msg) + next() + } else { + unhandled() + } + } + } + +/** + * Send the specified message to the given reference and wait for a reply. + * + * @param ref The actor to send the message to. + * @param after The delay after which the message should be received by the actor. + * @param transform The block to transform `self` to a message. + */ +suspend inline fun <T : Any, U : Any, reified V : T> SuspendingActorContext<T>.ask(ref: ActorRef<U>, + after: Duration = 0.0, + transform: (ActorRef<T>) -> U): V { + send(ref, transform(self), after) + return receiveOf() +} diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt index 832045a4..45cbce69 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt @@ -24,15 +24,13 @@ package com.atlarge.odcsim.coroutines.dsl -import com.atlarge.odcsim.ActorContext -import com.atlarge.odcsim.Behavior import com.atlarge.odcsim.Duration -import com.atlarge.odcsim.ReceivingBehavior -import com.atlarge.odcsim.Signal import com.atlarge.odcsim.Timeout import com.atlarge.odcsim.coroutines.SuspendingActorContext import com.atlarge.odcsim.coroutines.suspendWithBehavior -import com.atlarge.odcsim.internal.send +import com.atlarge.odcsim.setup +import com.atlarge.odcsim.receiveSignal +import com.atlarge.odcsim.unhandled import kotlin.coroutines.resume /** @@ -42,17 +40,17 @@ import kotlin.coroutines.resume */ suspend fun <T : Any> SuspendingActorContext<T>.timeout(after: Duration) = suspendWithBehavior<T, Unit> { cont, next -> - object : ReceivingBehavior<T>() { - init { - self.send(Timeout(this), after) - } - - override fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> = - if (signal is Timeout && signal.target == this) { + setup { ctx -> + val target = this + @Suppress("UNCHECKED_CAST") + ctx.send(ctx.self, Timeout(target) as T, after) + receiveSignal { _, signal -> + if (signal is Timeout && signal.target == target) { cont.resume(Unit) next() } else { unhandled() } + } } } diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorRef.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorContext.kt index 7653431c..f1aba25e 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorRef.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorContext.kt @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2018 atlarge-research + * Copyright (c) 2019 atlarge-research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,14 +24,20 @@ package com.atlarge.odcsim.internal +import com.atlarge.odcsim.ActorContext import com.atlarge.odcsim.ActorRef import com.atlarge.odcsim.Duration import com.atlarge.odcsim.Signal /** - * Send the specified system [Signal] to the given [ActorRef]. + * Send the specified [Signal] to the given actor reference after the specified duration. + * + * @param ref The actor to send the signal to. + * @param signal The signal to send to the referenced actor. + * @param after The delay after which the signal should be received by the actor. */ -fun <T : Any> ActorRef<T>.send(signal: Signal, duration: Duration) { +fun ActorContext<*>.sendSignal(ref: ActorRef<*>, signal: Signal, after: Duration = 0.0) { + // Signals are currently processed as regular messages @Suppress("UNCHECKED_CAST") - send(signal as T, duration) + send(ref as ActorRef<Any>, signal, after) } diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt index 7f2d39bc..70294ae5 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt @@ -27,14 +27,15 @@ package com.atlarge.odcsim.internal import com.atlarge.odcsim.ActorContext import com.atlarge.odcsim.ActorRef import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.Duration import com.atlarge.odcsim.Instant import com.atlarge.odcsim.ReceivingBehavior import com.atlarge.odcsim.Signal import com.atlarge.odcsim.coroutines.SuspendingActorContext import com.atlarge.odcsim.coroutines.SuspendingBehavior -import com.atlarge.odcsim.dsl.receiveMessage -import com.atlarge.odcsim.dsl.receiveSignal +import com.atlarge.odcsim.receiveSignal import com.atlarge.odcsim.coroutines.suspendWithBehavior +import com.atlarge.odcsim.receiveMessage import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume @@ -63,8 +64,10 @@ interface SuspendingActorContextImpl<T : Any> : SuspendingActorContext<T> { * interface. * This implementation uses the fact that each actor is thread-safe (as it processes its mailbox sequentially). */ -internal class SuspendingBehaviorImpl<T : Any>(actorContext: ActorContext<T>, initialBehavior: SuspendingBehavior<T>) : - ReceivingBehavior<T>(), SuspendingActorContextImpl<T> { +internal class SuspendingBehaviorImpl<T : Any>( + private var actorContext: ActorContext<T>, + initialBehavior: SuspendingBehavior<T>) : ReceivingBehavior<T>(), SuspendingActorContextImpl<T> { + /** * The next behavior to use. */ @@ -75,16 +78,6 @@ internal class SuspendingBehaviorImpl<T : Any>(actorContext: ActorContext<T>, in */ private val interpreter = BehaviorInterpreter(initialBehavior) - /** - * The current active [ActorContext]. - */ - private var actorContext: ActorContext<T> - - init { - this.actorContext = actorContext - this.start() - } - override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> { this.actorContext = ctx return interpreter.also { it.interpretMessage(ctx, msg) }.propagate(next) @@ -99,10 +92,18 @@ internal class SuspendingBehaviorImpl<T : Any>(actorContext: ActorContext<T>, in override val time: Instant get() = actorContext.time + override fun <U : Any> send(ref: ActorRef<U>, msg: U, after: Duration) = actorContext.send(ref, msg, after) + override fun <U : Any> spawn(behavior: Behavior<U>, name: String) = actorContext.spawn(behavior, name) override fun stop(child: ActorRef<*>): Boolean = actorContext.stop(child) + override fun sync(target: ActorRef<*>) = actorContext.sync(target) + + override fun unsync(target: ActorRef<*>) = actorContext.unsync(target) + + override fun isSync(target: ActorRef<*>): Boolean = actorContext.isSync(target) + override suspend fun receive(): T = suspendWithBehavior<T, T> { cont, next -> receiveMessage { msg -> cont.resume(msg) @@ -128,10 +129,11 @@ internal class SuspendingBehaviorImpl<T : Any>(actorContext: ActorContext<T>, in /** * Start the suspending behavior. */ - private fun start() { + internal fun start(): Behavior<T> { val behavior = interpreter.behavior as SuspendingBehavior<T> - val block: suspend () -> Unit = { behavior(this) } + val block: suspend () -> Behavior<T> = { behavior(this) } block.startCoroutine(SuspendingBehaviorImplContinuation()) + return next } /** @@ -144,13 +146,15 @@ internal class SuspendingBehaviorImpl<T : Any>(actorContext: ActorContext<T>, in /** * The continuation of suspending behavior. */ - private inner class SuspendingBehaviorImplContinuation : Continuation<Unit> { + private inner class SuspendingBehaviorImplContinuation : Continuation<Behavior<T>> { override val context = this@SuspendingBehaviorImpl - override fun resumeWith(result: Result<Unit>) { - // TODO Add some form of error handling here - stop() - next = Behavior.stopped() + override fun resumeWith(result: Result<Behavior<T>>) { + if (result.isSuccess) { + next = result.getOrNull()!! + } else if (result.isFailure) { + throw result.exceptionOrNull()!! + } } } } |
