diff options
Diffstat (limited to 'odcsim-core/src/main')
| -rw-r--r-- | odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt | 15 | ||||
| -rw-r--r-- | odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt | 19 | ||||
| -rw-r--r-- | odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt | 8 | ||||
| -rw-r--r-- | odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt | 101 | ||||
| -rw-r--r-- | odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt (renamed from odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt) | 55 | ||||
| -rw-r--r-- | odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/Behavior.kt | 36 | ||||
| -rw-r--r-- | odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Receive.kt | 64 | ||||
| -rw-r--r-- | odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt | 22 | ||||
| -rw-r--r-- | odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorContext.kt (renamed from odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorRef.kt) | 14 | ||||
| -rw-r--r-- | odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt | 46 |
10 files changed, 232 insertions, 148 deletions
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt index deccedd1..d5edd93f 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt @@ -41,6 +41,20 @@ interface ActorContext<T : Any> { val time: Instant /** + * Send the specified message to the actor referenced by this [ActorRef]. + * + * Please note that callees must guarantee that messages are sent strictly in increasing time. + * If so, this method guarantees that: + * - A message will never be received earlier than specified + * - A message might arrive later than specified if the two actors are not synchronized. + * + * @param ref The actor to send the message to. + * @param msg The message to send to the referenced actor. + * @param after The delay after which the message should be received by the actor. + */ + fun <U : Any> send(ref: ActorRef<U>, msg: U, after: Duration = 0.0) + + /** * Spawn a child actor from the given [Behavior] and with the specified name. * * @param behavior The behavior of the child actor to spawn. @@ -94,3 +108,4 @@ interface ActorContext<T : Any> { */ fun isSync(target: ActorRef<*>): Boolean } + diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt index bc81813b..71b10325 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt @@ -32,23 +32,12 @@ interface ActorRef<in T : Any> { * The path for this actor (from this actor up to the root actor). */ val path: ActorPath - - /** - * Send the specified message to the actor referenced by this [ActorRef]. - * - * Please note that callees must guarantee that messages are sent strictly in increasing time. - * If so, this method guarantees that: - * - A message will never be received earlier than specified - * - A message might arrive later than specified if the two actors are not synchronized. - * - * @param msg The message to send to the referenced actor. - * @param after The delay after which the message should be received by the actor. - */ - fun send(msg: T, after: Duration = 0.1) } /** * Unsafe helper method for widening the type accepted by this [ActorRef]. */ -@Suppress("UNCHECKED_CAST") -fun <U : Any, T : U> ActorRef<T>.upcast(): ActorRef<U> = this as ActorRef<U> +fun <U : Any, T : U> ActorRef<T>.upcast(): ActorRef<U> { + @Suppress("UNCHECKED_CAST") + return this as ActorRef<U> +} diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt index a67f13d9..cbb80541 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt @@ -49,4 +49,12 @@ interface ActorSystem<in T : Any> : ActorRef<T> { * @param until The point until which the simulation should run. */ fun run(until: Duration = Duration.POSITIVE_INFINITY) + + /** + * Send the specified message to the root actor of this [ActorSystem]. + * + * @param msg The message to send to the referenced actor. + * @param after The delay after which the message should be received by the actor. + */ + fun send(msg: T, after: Duration = 0.1) } diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt index 1bc41b44..411915ef 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt @@ -24,8 +24,6 @@ package com.atlarge.odcsim -import com.atlarge.odcsim.dsl.wrap - /** * The representation of the behavior of an actor. * @@ -51,35 +49,35 @@ sealed class Behavior<T : Any> { } /** - * Companion object used to bind factory methods to. + * Compose this [Behavior] with a fallback [Behavior] which is used in case this [Behavior] does not handle the + * incoming message or signal. */ - companion object { - /** - * This [Behavior] is used to signal that this actor shall terminate voluntarily. If this actor has created child actors - * then these will be stopped as part of the shutdown procedure. - */ - fun <T : Any> stopped(): Behavior<T> { - @Suppress("UNCHECKED_CAST") - return StoppedBehavior as Behavior<T> - } + fun orElse(behavior: Behavior<T>): Behavior<T> = + wrap(this) { left -> + wrap(behavior) { right -> + object : ReceivingBehavior<T>() { + override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> { + if (left.interpretMessage(ctx, msg)) { + return left.behavior + } else if (right.interpretMessage(ctx, msg)) { + return right.behavior + } + + return unhandled() + } - /** - * This [Behavior] is used to signal that this actor wants to reuse its previous behavior. - */ - fun <T : Any> same(): Behavior<T> { - @Suppress("UNCHECKED_CAST") - return SameBehavior as Behavior<T> - } + override fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> { + if (left.interpretSignal(ctx, signal)) { + return left.behavior + } else if (right.interpretSignal(ctx, signal)) { + return right.behavior + } - /** - * This [Behavior] is used to signal to the system that the last message or signal went unhandled. This will - * reuse the previous behavior. - */ - fun <T : Any> unhandled(): Behavior<T> { - @Suppress("UNCHECKED_CAST") - return UnhandledBehavior as Behavior<T> + return unhandled() + } + } + } } - } } /** @@ -103,68 +101,37 @@ abstract class DeferredBehavior<T : Any> : Behavior<T>() { * The message may either be of the type that the actor declares and which is part of the [ActorRef] signature, * or it may be a system [Signal] that expresses a lifecycle event of either this actor or one of its child actors. * - * @param T The shape of the messages the behavior accepts.* + * @param T The shape of the messages the behavior accepts. */ abstract class ReceivingBehavior<T : Any> : Behavior<T>() { /** * Process an incoming message of type [T] and return the actor's next behavior. * * The returned behavior can in addition to normal behaviors be one of the canned special objects: - * - returning [Behavior.Companion.stopped] will terminate this Behavior - * - returning [Behavior.Companion.same] designates to reuse the current Behavior - * - returning [Behavior.Companion.unhandled] keeps the same Behavior and signals that the message was not yet handled + * - returning [stopped] will terminate this Behavior + * - returning [same] designates to reuse the current Behavior + * - returning [unhandled] keeps the same Behavior and signals that the message was not yet handled * * @param ctx The [ActorContext] in which the actor is currently running. * @param msg The message that was received. */ - open fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> = Behavior.unhandled() + open fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> = unhandled() /** * Process an incoming [Signal] and return the actor's next behavior. * * The returned behavior can in addition to normal behaviors be one of the canned special objects: - * - returning [Behavior.Companion.stopped] will terminate this Behavior - * - returning [Behavior.Companion.same] designates to reuse the current Behavior - * - returning [Behavior.Companion.unhandled] keeps the same Behavior and signals that the message was not yet handled + * - returning [stopped] will terminate this Behavior + * - returning [same] designates to reuse the current Behavior + * - returning [unhandled] keeps the same Behavior and signals that the message was not yet handled * * @param ctx The [ActorContext] in which the actor is currently running. * @param signal The [Signal] that was received. */ - open fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> = Behavior.unhandled() + open fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> = unhandled() } /** - * Compose this [Behavior] with a fallback [Behavior] which is used in case this [Behavior] does not handle the incoming - * message or signal. - */ -fun <T : Any> Behavior<T>.orElse(behavior: Behavior<T>): Behavior<T> = - Behavior.wrap(this) { left -> - Behavior.wrap(behavior) { right -> - object : ReceivingBehavior<T>() { - override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> { - if (left.interpretMessage(ctx, msg)) { - return left.behavior - } else if (right.interpretMessage(ctx, msg)) { - return right.behavior - } - - return Behavior.unhandled() - } - - override fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> { - if (left.interpretSignal(ctx, signal)) { - return left.behavior - } else if (right.interpretSignal(ctx, signal)) { - return right.behavior - } - - return Behavior.unhandled() - } - } - } - } - -/** * A flag to indicate whether a [Behavior] instance is still alive. */ val <T : Any> Behavior<T>.isAlive get() = this !is StoppedBehavior diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt index d4b092d2..4dea1007 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2018 atlarge-research + * Copyright (c) 2019 atlarge-research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -21,22 +21,43 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ +@file:JvmName("Behaviors") +package com.atlarge.odcsim -package com.atlarge.odcsim.dsl - -import com.atlarge.odcsim.ActorContext -import com.atlarge.odcsim.Behavior -import com.atlarge.odcsim.DeferredBehavior -import com.atlarge.odcsim.ReceivingBehavior -import com.atlarge.odcsim.Signal import com.atlarge.odcsim.internal.BehaviorInterpreter import com.atlarge.odcsim.internal.EmptyBehavior import com.atlarge.odcsim.internal.IgnoreBehavior /** + * This [Behavior] is used to signal that this actor shall terminate voluntarily. If this actor has created child actors + * then these will be stopped as part of the shutdown procedure. + */ +fun <T : Any> stopped(): Behavior<T> { + @Suppress("UNCHECKED_CAST") + return StoppedBehavior as Behavior<T> +} + +/** + * This [Behavior] is used to signal that this actor wants to reuse its previous behavior. + */ +fun <T : Any> same(): Behavior<T> { + @Suppress("UNCHECKED_CAST") + return SameBehavior as Behavior<T> +} + +/** + * This [Behavior] is used to signal to the system that the last message or signal went unhandled. This will + * reuse the previous behavior. + */ +fun <T : Any> unhandled(): Behavior<T> { + @Suppress("UNCHECKED_CAST") + return UnhandledBehavior as Behavior<T> +} + +/** * A factory for [Behavior]. Creation of the behavior instance is deferred until the actor is started. */ -fun <T : Any> Behavior.Companion.setup(block: (ActorContext<T>) -> Behavior<T>): Behavior<T> { +fun <T : Any> setup(block: (ActorContext<T>) -> Behavior<T>): Behavior<T> { return object : DeferredBehavior<T>() { override fun invoke(ctx: ActorContext<T>): Behavior<T> = block(ctx) } @@ -45,18 +66,18 @@ fun <T : Any> Behavior.Companion.setup(block: (ActorContext<T>) -> Behavior<T>): /** * A [Behavior] that ignores any incoming message or signal and keeps the same behavior. */ -fun <T : Any> Behavior.Companion.ignore(): Behavior<T> = IgnoreBehavior.narrow() +fun <T : Any> ignore(): Behavior<T> = IgnoreBehavior.narrow() /** * A [Behavior] that treats every incoming message or signal as unhandled. */ -fun <T : Any> Behavior.Companion.empty(): Behavior<T> = EmptyBehavior.narrow() +fun <T : Any> empty(): Behavior<T> = EmptyBehavior.narrow() /** * Construct a [Behavior] that reacts to incoming messages, provides access to the [ActorContext] and returns the * actor's next behavior. */ -fun <T : Any> Behavior.Companion.receive(handler: (ActorContext<T>, T) -> Behavior<T>): Behavior<T> { +fun <T : Any> receive(handler: (ActorContext<T>, T) -> Behavior<T>): Behavior<T> { return object : ReceivingBehavior<T>() { override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> = handler(ctx, msg) } @@ -65,9 +86,9 @@ fun <T : Any> Behavior.Companion.receive(handler: (ActorContext<T>, T) -> Behavi /** * Construct a [Behavior] that reacts to incoming messages and returns the actor's next behavior. */ -fun <T : Any> Behavior.Companion.receiveMessage(onMessage: (T) -> Behavior<T>): Behavior<T> { +fun <T : Any> receiveMessage(handler: (T) -> Behavior<T>): Behavior<T> { return object : ReceivingBehavior<T>() { - override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> = onMessage(msg) + override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> = handler(msg) } } @@ -75,7 +96,7 @@ fun <T : Any> Behavior.Companion.receiveMessage(onMessage: (T) -> Behavior<T>): * Construct a [Behavior] that reacts to incoming signals, provides access to the [ActorContext] and returns the * actor's next behavior. */ -fun <T : Any> Behavior.Companion.receiveSignal(handler: (ActorContext<T>, Signal) -> Behavior<T>): Behavior<T> { +fun <T : Any> receiveSignal(handler: (ActorContext<T>, Signal) -> Behavior<T>): Behavior<T> { return object : ReceivingBehavior<T>() { override fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> = handler(ctx, signal) } @@ -85,6 +106,6 @@ fun <T : Any> Behavior.Companion.receiveSignal(handler: (ActorContext<T>, Signal * Construct a [Behavior] that wraps another behavior instance and uses a [BehaviorInterpreter] to pass incoming * messages and signals to the wrapper behavior. */ -fun <T : Any> Behavior.Companion.wrap(behavior: Behavior<T>, wrap: (BehaviorInterpreter<T>) -> Behavior<T>): Behavior<T> { - return Behavior.setup { ctx -> wrap(BehaviorInterpreter(behavior, ctx)) } +fun <T : Any> wrap(behavior: Behavior<T>, wrap: (BehaviorInterpreter<T>) -> Behavior<T>): Behavior<T> { + return setup { ctx -> wrap(BehaviorInterpreter(behavior, ctx)) } } diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/Behavior.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/Behavior.kt index 20aab232..b963cdb6 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/Behavior.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/Behavior.kt @@ -33,6 +33,7 @@ import com.atlarge.odcsim.internal.SuspendingBehaviorImpl import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext import kotlin.coroutines.suspendCoroutine +import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn /** * A [Behavior] that allows method calls to suspend execution via Kotlin coroutines. @@ -41,23 +42,15 @@ import kotlin.coroutines.suspendCoroutine */ abstract class SuspendingBehavior<T : Any> : DeferredBehavior<T>() { /** - * Run the logic of this behavior. + * Run the suspending logic of this behavior. * * @param ctx The [SuspendingActorContext] in which the behavior is executed. + * @return The next behavior for the actor. */ - abstract suspend operator fun invoke(ctx: SuspendingActorContext<T>) + abstract suspend operator fun invoke(ctx: SuspendingActorContext<T>): Behavior<T> // Immediately transfer to implementation - override fun invoke(ctx: ActorContext<T>): Behavior<T> = SuspendingBehaviorImpl(ctx, this) -} - -/** - * Construct a [Behavior] that uses Kotlin coroutines functionality to handle incoming messages and signals. - */ -fun <T : Any> Behavior.Companion.suspending(block: suspend (SuspendingActorContext<T>) -> Unit): Behavior<T> { - return object : SuspendingBehavior<T>() { - override suspend fun invoke(ctx: SuspendingActorContext<T>) = block(ctx) - } + override fun invoke(ctx: ActorContext<T>): Behavior<T> = SuspendingBehaviorImpl(ctx, this).start() } /** @@ -102,3 +95,22 @@ suspend fun <T : Any, U> suspendWithBehavior(block: (Continuation<U>, () -> Beha ?: throw UnsupportedOperationException("Coroutine does not run inside SuspendingBehavior") ctx.become(block(cont) { ctx.behavior }) } + +/** + * Obtain the current [SuspendingActorContext] instance for the active continuation. + */ +suspend fun <T : Any> actorContext(): SuspendingActorContext<T> = + suspendCoroutineUninterceptedOrReturn { cont -> + @Suppress("UNCHECKED_CAST") + cont.context[SuspendingActorContext] as? SuspendingActorContext<T> + ?: throw UnsupportedOperationException("Coroutine does not run inside SuspendingBehavior") + } + +/** + * Construct a [Behavior] that uses Kotlin coroutines functionality to handle incoming messages and signals. + */ +fun <T : Any> suspending(block: suspend (SuspendingActorContext<T>) -> Behavior<T>): Behavior<T> { + return object : SuspendingBehavior<T>() { + override suspend fun invoke(ctx: SuspendingActorContext<T>) = block(ctx) + } +} diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Receive.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Receive.kt new file mode 100644 index 00000000..7c09734b --- /dev/null +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Receive.kt @@ -0,0 +1,64 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim.coroutines.dsl + +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Duration +import com.atlarge.odcsim.coroutines.SuspendingActorContext +import com.atlarge.odcsim.coroutines.suspendWithBehavior +import com.atlarge.odcsim.receiveMessage +import com.atlarge.odcsim.unhandled +import kotlin.coroutines.resume + +/** + * Receive only messages of type [U] and mark all other messages as unhandled. + * + * @return The received message. + */ +suspend inline fun <T : Any, reified U : T> SuspendingActorContext<T>.receiveOf(): U = + suspendWithBehavior<T, U> { cont, next -> + receiveMessage { msg -> + if (msg is U) { + cont.resume(msg) + next() + } else { + unhandled() + } + } + } + +/** + * Send the specified message to the given reference and wait for a reply. + * + * @param ref The actor to send the message to. + * @param after The delay after which the message should be received by the actor. + * @param transform The block to transform `self` to a message. + */ +suspend inline fun <T : Any, U : Any, reified V : T> SuspendingActorContext<T>.ask(ref: ActorRef<U>, + after: Duration = 0.0, + transform: (ActorRef<T>) -> U): V { + send(ref, transform(self), after) + return receiveOf() +} diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt index 832045a4..45cbce69 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt @@ -24,15 +24,13 @@ package com.atlarge.odcsim.coroutines.dsl -import com.atlarge.odcsim.ActorContext -import com.atlarge.odcsim.Behavior import com.atlarge.odcsim.Duration -import com.atlarge.odcsim.ReceivingBehavior -import com.atlarge.odcsim.Signal import com.atlarge.odcsim.Timeout import com.atlarge.odcsim.coroutines.SuspendingActorContext import com.atlarge.odcsim.coroutines.suspendWithBehavior -import com.atlarge.odcsim.internal.send +import com.atlarge.odcsim.setup +import com.atlarge.odcsim.receiveSignal +import com.atlarge.odcsim.unhandled import kotlin.coroutines.resume /** @@ -42,17 +40,17 @@ import kotlin.coroutines.resume */ suspend fun <T : Any> SuspendingActorContext<T>.timeout(after: Duration) = suspendWithBehavior<T, Unit> { cont, next -> - object : ReceivingBehavior<T>() { - init { - self.send(Timeout(this), after) - } - - override fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> = - if (signal is Timeout && signal.target == this) { + setup { ctx -> + val target = this + @Suppress("UNCHECKED_CAST") + ctx.send(ctx.self, Timeout(target) as T, after) + receiveSignal { _, signal -> + if (signal is Timeout && signal.target == target) { cont.resume(Unit) next() } else { unhandled() } + } } } diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorRef.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorContext.kt index 7653431c..f1aba25e 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorRef.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorContext.kt @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2018 atlarge-research + * Copyright (c) 2019 atlarge-research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,14 +24,20 @@ package com.atlarge.odcsim.internal +import com.atlarge.odcsim.ActorContext import com.atlarge.odcsim.ActorRef import com.atlarge.odcsim.Duration import com.atlarge.odcsim.Signal /** - * Send the specified system [Signal] to the given [ActorRef]. + * Send the specified [Signal] to the given actor reference after the specified duration. + * + * @param ref The actor to send the signal to. + * @param signal The signal to send to the referenced actor. + * @param after The delay after which the signal should be received by the actor. */ -fun <T : Any> ActorRef<T>.send(signal: Signal, duration: Duration) { +fun ActorContext<*>.sendSignal(ref: ActorRef<*>, signal: Signal, after: Duration = 0.0) { + // Signals are currently processed as regular messages @Suppress("UNCHECKED_CAST") - send(signal as T, duration) + send(ref as ActorRef<Any>, signal, after) } diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt index 7f2d39bc..70294ae5 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt @@ -27,14 +27,15 @@ package com.atlarge.odcsim.internal import com.atlarge.odcsim.ActorContext import com.atlarge.odcsim.ActorRef import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.Duration import com.atlarge.odcsim.Instant import com.atlarge.odcsim.ReceivingBehavior import com.atlarge.odcsim.Signal import com.atlarge.odcsim.coroutines.SuspendingActorContext import com.atlarge.odcsim.coroutines.SuspendingBehavior -import com.atlarge.odcsim.dsl.receiveMessage -import com.atlarge.odcsim.dsl.receiveSignal +import com.atlarge.odcsim.receiveSignal import com.atlarge.odcsim.coroutines.suspendWithBehavior +import com.atlarge.odcsim.receiveMessage import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume @@ -63,8 +64,10 @@ interface SuspendingActorContextImpl<T : Any> : SuspendingActorContext<T> { * interface. * This implementation uses the fact that each actor is thread-safe (as it processes its mailbox sequentially). */ -internal class SuspendingBehaviorImpl<T : Any>(actorContext: ActorContext<T>, initialBehavior: SuspendingBehavior<T>) : - ReceivingBehavior<T>(), SuspendingActorContextImpl<T> { +internal class SuspendingBehaviorImpl<T : Any>( + private var actorContext: ActorContext<T>, + initialBehavior: SuspendingBehavior<T>) : ReceivingBehavior<T>(), SuspendingActorContextImpl<T> { + /** * The next behavior to use. */ @@ -75,16 +78,6 @@ internal class SuspendingBehaviorImpl<T : Any>(actorContext: ActorContext<T>, in */ private val interpreter = BehaviorInterpreter(initialBehavior) - /** - * The current active [ActorContext]. - */ - private var actorContext: ActorContext<T> - - init { - this.actorContext = actorContext - this.start() - } - override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> { this.actorContext = ctx return interpreter.also { it.interpretMessage(ctx, msg) }.propagate(next) @@ -99,10 +92,18 @@ internal class SuspendingBehaviorImpl<T : Any>(actorContext: ActorContext<T>, in override val time: Instant get() = actorContext.time + override fun <U : Any> send(ref: ActorRef<U>, msg: U, after: Duration) = actorContext.send(ref, msg, after) + override fun <U : Any> spawn(behavior: Behavior<U>, name: String) = actorContext.spawn(behavior, name) override fun stop(child: ActorRef<*>): Boolean = actorContext.stop(child) + override fun sync(target: ActorRef<*>) = actorContext.sync(target) + + override fun unsync(target: ActorRef<*>) = actorContext.unsync(target) + + override fun isSync(target: ActorRef<*>): Boolean = actorContext.isSync(target) + override suspend fun receive(): T = suspendWithBehavior<T, T> { cont, next -> receiveMessage { msg -> cont.resume(msg) @@ -128,10 +129,11 @@ internal class SuspendingBehaviorImpl<T : Any>(actorContext: ActorContext<T>, in /** * Start the suspending behavior. */ - private fun start() { + internal fun start(): Behavior<T> { val behavior = interpreter.behavior as SuspendingBehavior<T> - val block: suspend () -> Unit = { behavior(this) } + val block: suspend () -> Behavior<T> = { behavior(this) } block.startCoroutine(SuspendingBehaviorImplContinuation()) + return next } /** @@ -144,13 +146,15 @@ internal class SuspendingBehaviorImpl<T : Any>(actorContext: ActorContext<T>, in /** * The continuation of suspending behavior. */ - private inner class SuspendingBehaviorImplContinuation : Continuation<Unit> { + private inner class SuspendingBehaviorImplContinuation : Continuation<Behavior<T>> { override val context = this@SuspendingBehaviorImpl - override fun resumeWith(result: Result<Unit>) { - // TODO Add some form of error handling here - stop() - next = Behavior.stopped() + override fun resumeWith(result: Result<Behavior<T>>) { + if (result.isSuccess) { + next = result.getOrNull()!! + } else if (result.isFailure) { + throw result.exceptionOrNull()!! + } } } } |
