From da9dd0c3cb3d9a6b72b6fb4efd257d0449711f17 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 2 Nov 2018 22:50:08 +0100 Subject: feat: Add support for suspending behavior This change adds support for suspending behavior via Kotlin's coroutine feature officially introduced in Kotlin 1.3. The syntax is similar to the 1.x version of the simulator to allow for easier porting to 2.x. --- .../src/main/kotlin/com/atlarge/odcsim/Behavior.kt | 149 +-------------- .../src/main/kotlin/com/atlarge/odcsim/Signals.kt | 10 + .../com/atlarge/odcsim/coroutines/Behavior.kt | 97 ++++++++++ .../com/atlarge/odcsim/coroutines/dsl/Timeout.kt | 58 ++++++ .../kotlin/com/atlarge/odcsim/dsl/Behaviors.kt | 22 +-- .../kotlin/com/atlarge/odcsim/internal/ActorRef.kt | 37 ++++ .../kotlin/com/atlarge/odcsim/internal/Behavior.kt | 48 +++++ .../atlarge/odcsim/internal/BehaviorInterpreter.kt | 201 +++++++++++++++++++++ .../com/atlarge/odcsim/internal/Coroutines.kt | 169 +++++++++++++++++ .../kotlin/com/atlarge/odcsim/ActorSystemTest.kt | 19 +- 10 files changed, 646 insertions(+), 164 deletions(-) create mode 100644 odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/Behavior.kt create mode 100644 odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt create mode 100644 odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorRef.kt create mode 100644 odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Behavior.kt create mode 100644 odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/BehaviorInterpreter.kt create mode 100644 odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt (limited to 'odcsim-core') diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt index 87b2f28a..1bc41b44 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt @@ -174,164 +174,25 @@ val Behavior.isAlive get() = this !is StoppedBehavior */ val Behavior.isUnhandled get() = this !is UnhandledBehavior +// The special behaviors are kept in this file as to be able to seal the Behavior class to prevent users from extending +// it. /** * A special [Behavior] instance that signals that the actor has stopped. */ -private object StoppedBehavior : Behavior() { +internal object StoppedBehavior : Behavior() { override fun toString() = "Stopped" } /** * A special [Behavior] object to signal that the actor wants to reuse its previous behavior. */ -private object SameBehavior : Behavior() { +internal object SameBehavior : Behavior() { override fun toString() = "Same" } /** * A special [Behavior] object that indicates that the last message or signal was not handled. */ -private object UnhandledBehavior : Behavior() { +internal object UnhandledBehavior : Behavior() { override fun toString() = "Unhandled" } - -/** - * Helper class that interprets messages/signals, canonicalizes special objects and manages the life-cycle of - * [Behavior] instances. - * - * @param initialBehavior The initial behavior to use. - */ -class BehaviorInterpreter(initialBehavior: Behavior) { - /** - * The current [Behavior] instance. - */ - var behavior: Behavior = initialBehavior - - /** - * A flag to indicate whether the current behavior is still alive. - */ - val isAlive: Boolean get() = behavior.isAlive - - /** - * Construct a [BehaviorInterpreter] with the specified initial behavior and immediately start it in the specified - * context. - * - * @param initialBehavior The initial behavior of the actor. - * @param ctx The [ActorContext] to run the behavior in. - */ - constructor(initialBehavior: Behavior, ctx: ActorContext) : this(initialBehavior) { - start(ctx) - } - - /** - * Start the initial behavior. - * - * @param ctx The [ActorContext] to start the behavior in. - */ - fun start(ctx: ActorContext) { - behavior = validateAsInitial(start(behavior, ctx)) - } - - /** - * Stop the current active behavior and move into the stopped state. - * - * @param ctx The [ActorContext] this takes place in. - */ - fun stop(ctx: ActorContext) { - behavior = start(StoppedBehavior.narrow(), ctx) - } - - /** - * Interpret the given message of type [T] using the current active behavior. - * - * @return `true` if the message was handled by the active behavior, `false` otherwise. - */ - fun interpretMessage(ctx: ActorContext, msg: T): Boolean = interpret(ctx, msg, false) - - /** - * Interpret the given [Signal] using the current active behavior. - * - * @return `true` if the signal was handled by the active behavior, `false` otherwise. - */ - fun interpretSignal(ctx: ActorContext, signal: Signal): Boolean = interpret(ctx, signal, true) - - /** - * Interpret the given message or signal using the current active behavior. - * - * @return `true` if the message or signal was handled by the active behavior, `false` otherwise. - */ - private fun interpret(ctx: ActorContext, msg: Any, isSignal: Boolean): Boolean = - if (isAlive) { - val next = when (val current = behavior) { - is DeferredBehavior -> - throw IllegalStateException("Deferred [$current] should not be passed to interpreter") - is ReceivingBehavior -> - if (isSignal) - current.receiveSignal(ctx, msg as Signal) - else - @Suppress("UNCHECKED_CAST") - current.receive(ctx, msg as T) - is SameBehavior, is UnhandledBehavior -> - throw IllegalStateException("Cannot execute with [$current] as behavior") - is StoppedBehavior -> current - } - - val unhandled = next.isUnhandled - behavior = canonicalize(next, behavior, ctx) - !unhandled - } else { - false - } - - /** - * Validate whether the given [Behavior] can be used as initial behavior. Throw an [IllegalArgumentException] if - * the [Behavior] is not valid. - * - * @param behavior The behavior to validate. - */ - private fun validateAsInitial(behavior: Behavior): Behavior = - when (behavior) { - is SameBehavior, is UnhandledBehavior -> - throw IllegalArgumentException("Cannot use [$behavior] as initial behavior") - else -> behavior - } - - /** - * Helper methods to properly manage the special, canned behavior objects. It highly recommended to use the - * [BehaviorInterpreter] instead to properly manage the life-cycles of the behavior objects. - */ - companion object { - /** - * Start the initial behavior of an actor in the specified [ActorContext]. - * - * This will activate the initial behavior and canonicalize the resulting behavior. - * - * @param behavior The initial behavior to start. - * @param ctx The [ActorContext] to start the behavior in. - * @return The behavior that has been started. - */ - tailrec fun start(behavior: Behavior, ctx: ActorContext): Behavior = - when (behavior) { - is DeferredBehavior -> start(behavior(ctx), ctx) - else -> behavior - } - - /** - * Given a possibly special behavior (same or unhandled) and a "current" behavior (which defines the meaning of - * encountering a `same` behavior) this method computes the next behavior, suitable for passing a message or - * signal. - * - * @param behavior The actor's next behavior. - * @param current The actor's current behavior. - * @param ctx The context in which the actor runs. - * @return The actor's canonicalized next behavior. - */ - tailrec fun canonicalize(behavior: Behavior, current: Behavior, ctx: ActorContext): Behavior = - when (behavior) { - is SameBehavior, current -> current - is UnhandledBehavior -> current - is DeferredBehavior -> canonicalize(behavior(ctx), current, ctx) - else -> behavior - } - } -} diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt index 635bf9ac..35aad18b 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt @@ -39,3 +39,13 @@ object PreStart : Signal * Lifecycle signal that is fired after this actor and all its child actors (transitively) have terminated. */ object PostStop : Signal + +/** + * A [Signal] to indicate an actor has timed out. + * + * This class contains a [target] property in order to allow nested behavior to function properly when multiple layers + * are waiting on this signal. + * + * @property target The target object that has timed out. + */ +class Timeout(val target: Any) : Signal diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/Behavior.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/Behavior.kt new file mode 100644 index 00000000..4951d619 --- /dev/null +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/Behavior.kt @@ -0,0 +1,97 @@ +/* + * MIT License + * + * Copyright (c) 2018 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim.coroutines + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.DeferredBehavior +import com.atlarge.odcsim.Signal +import com.atlarge.odcsim.internal.SuspendingBehaviorContext +import com.atlarge.odcsim.internal.SuspendingBehaviorImpl +import kotlin.coroutines.Continuation +import kotlin.coroutines.suspendCoroutine + +/** + * A [Behavior] that allows method calls to suspend execution via Kotlin coroutines. + * + * @param T The shape of the messages the actor accepts. + */ +abstract class SuspendingBehavior : DeferredBehavior() { + /** + * Run the logic of this behavior. + * + * @param ctx The [SuspendingActorContext] in which the behavior is executed. + */ + abstract suspend operator fun invoke(ctx: SuspendingActorContext) + + // Immediately transfer to implementation + override fun invoke(ctx: ActorContext): Behavior = SuspendingBehaviorImpl(ctx, this) +} + +/** + * Construct a [Behavior] that uses Kotlin coroutines functionality to handle incoming messages and signals. + */ +fun Behavior.Companion.suspending(block: suspend (SuspendingActorContext) -> Unit): Behavior { + return object : SuspendingBehavior() { + override suspend fun invoke(ctx: SuspendingActorContext) = block(ctx) + } +} + +/** + * An [ActorContext] that provides additional functionality for receiving messages and signals from + * the actor's mailbox. + * + * @param T The shape of the messages the actor accepts. + */ +interface SuspendingActorContext : ActorContext { + /** + * Suspend execution of the active coroutine to wait for a message of type [T] to be received in the actor's + * mailbox. During suspension, incoming signals will be marked unhandled. + * + * @return The message of type [T] that has been received. + */ + suspend fun receive(): T + + /** + * Suspend execution of the active coroutine to wait for a [Signal] to be received in the actor's mailbox. + * During suspension, incoming messages will be marked unhandled. + * + * @return The [Signal] that has been received. + */ + suspend fun receiveSignal(): Signal +} + +/** + * Obtains the current continuation instance inside suspend functions and suspends currently running coroutine. [block] + * should return a [Behavior] that will resume the continuation and return the next behavior which is supplied via the + * second argument of the block. + */ +suspend fun suspendWithBehavior(block: (Continuation, () -> Behavior) -> Behavior): U = + suspendCoroutine { cont -> + @Suppress("UNCHECKED_CAST") + val ctx = cont.context[SuspendingBehaviorContext] as? SuspendingBehaviorContext + ?: throw UnsupportedOperationException("Coroutine does not run inside SuspendingBehavior") + ctx.become(block(cont) { ctx.behavior }) + } diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt new file mode 100644 index 00000000..832045a4 --- /dev/null +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt @@ -0,0 +1,58 @@ +/* + * MIT License + * + * Copyright (c) 2018 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim.coroutines.dsl + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.Duration +import com.atlarge.odcsim.ReceivingBehavior +import com.atlarge.odcsim.Signal +import com.atlarge.odcsim.Timeout +import com.atlarge.odcsim.coroutines.SuspendingActorContext +import com.atlarge.odcsim.coroutines.suspendWithBehavior +import com.atlarge.odcsim.internal.send +import kotlin.coroutines.resume + +/** + * Block execution for the specified duration. + * + * @param after The duration after which execution should continue. + */ +suspend fun SuspendingActorContext.timeout(after: Duration) = + suspendWithBehavior { cont, next -> + object : ReceivingBehavior() { + init { + self.send(Timeout(this), after) + } + + override fun receiveSignal(ctx: ActorContext, signal: Signal): Behavior = + if (signal is Timeout && signal.target == this) { + cont.resume(Unit) + next() + } else { + unhandled() + } + } + } diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt index 590595d3..d4b092d2 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt @@ -26,10 +26,12 @@ package com.atlarge.odcsim.dsl import com.atlarge.odcsim.ActorContext import com.atlarge.odcsim.Behavior -import com.atlarge.odcsim.BehaviorInterpreter import com.atlarge.odcsim.DeferredBehavior import com.atlarge.odcsim.ReceivingBehavior import com.atlarge.odcsim.Signal +import com.atlarge.odcsim.internal.BehaviorInterpreter +import com.atlarge.odcsim.internal.EmptyBehavior +import com.atlarge.odcsim.internal.IgnoreBehavior /** * A factory for [Behavior]. Creation of the behavior instance is deferred until the actor is started. @@ -45,29 +47,11 @@ fun Behavior.Companion.setup(block: (ActorContext) -> Behavior): */ fun Behavior.Companion.ignore(): Behavior = IgnoreBehavior.narrow() -/** - * A [Behavior] object that ignores all messages sent to the actor. - */ -private object IgnoreBehavior : ReceivingBehavior() { - override fun receive(ctx: ActorContext, msg: Any): Behavior = this - - override fun receiveSignal(ctx: ActorContext, signal: Signal): Behavior = this - - override fun toString() = "Ignore" -} - /** * A [Behavior] that treats every incoming message or signal as unhandled. */ fun Behavior.Companion.empty(): Behavior = EmptyBehavior.narrow() -/** - * A [Behavior] object that does not handle any message it receives. - */ -private object EmptyBehavior : ReceivingBehavior() { - override fun toString() = "Empty" -} - /** * Construct a [Behavior] that reacts to incoming messages, provides access to the [ActorContext] and returns the * actor's next behavior. diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorRef.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorRef.kt new file mode 100644 index 00000000..7653431c --- /dev/null +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorRef.kt @@ -0,0 +1,37 @@ +/* + * MIT License + * + * Copyright (c) 2018 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim.internal + +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Duration +import com.atlarge.odcsim.Signal + +/** + * Send the specified system [Signal] to the given [ActorRef]. + */ +fun ActorRef.send(signal: Signal, duration: Duration) { + @Suppress("UNCHECKED_CAST") + send(signal as T, duration) +} diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Behavior.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Behavior.kt new file mode 100644 index 00000000..b07cabc0 --- /dev/null +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Behavior.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2018 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim.internal + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.ReceivingBehavior +import com.atlarge.odcsim.Signal + +/** + * A [Behavior] object that ignores all messages sent to the actor. + */ +internal object IgnoreBehavior : ReceivingBehavior() { + override fun receive(ctx: ActorContext, msg: Any): Behavior = this + + override fun receiveSignal(ctx: ActorContext, signal: Signal): Behavior = this + + override fun toString() = "Ignore" +} + +/** + * A [Behavior] object that does not handle any message it receives. + */ +internal object EmptyBehavior : ReceivingBehavior() { + override fun toString() = "Empty" +} diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/BehaviorInterpreter.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/BehaviorInterpreter.kt new file mode 100644 index 00000000..194c2a62 --- /dev/null +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/BehaviorInterpreter.kt @@ -0,0 +1,201 @@ +/* + * MIT License + * + * Copyright (c) 2018 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim.internal + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.DeferredBehavior +import com.atlarge.odcsim.ReceivingBehavior +import com.atlarge.odcsim.SameBehavior +import com.atlarge.odcsim.Signal +import com.atlarge.odcsim.StoppedBehavior +import com.atlarge.odcsim.UnhandledBehavior +import com.atlarge.odcsim.isAlive +import com.atlarge.odcsim.isUnhandled + +/** + * Helper class that interprets messages/signals, canonicalizes special objects and manages the life-cycle of + * [Behavior] instances. + * + * @param initialBehavior The initial behavior to use. + */ +class BehaviorInterpreter(initialBehavior: Behavior) { + /** + * The current [Behavior] instance. + */ + var behavior: Behavior = initialBehavior + private set + + /** + * A flag to indicate the interpreter is still alive. + */ + val isAlive: Boolean get() = behavior.isAlive + + /** + * Construct a [BehaviorInterpreter] with the specified initial behavior and immediately start it in the specified + * context. + * + * @param initialBehavior The initial behavior of the actor. + * @param ctx The [ActorContext] to run the behavior in. + */ + constructor(initialBehavior: Behavior, ctx: ActorContext) : this(initialBehavior) { + start(ctx) + } + + /** + * Start the initial behavior. + * + * @param ctx The [ActorContext] to start the behavior in. + */ + fun start(ctx: ActorContext) { + behavior = validateAsInitial(start(ctx, behavior)) + } + + /** + * Stop the current active behavior and move into the stopped state. + * + * @param ctx The [ActorContext] this takes place in. + */ + fun stop(ctx: ActorContext) { + behavior = start(ctx, StoppedBehavior.narrow()) + } + + /** + * Replace the current behavior with the specified new behavior. + * + * @param ctx The [ActorContext] to run the behavior in. + * @param next The behavior to replace the current behavior with. + */ + fun become(ctx: ActorContext, next: Behavior) { + this.behavior = canonicalize(ctx, behavior, next) + } + + /** + * Propagate special states of the wrapper [Behavior] to the specified [Behavior]. This means + * that if the behavior of this interpreter is stopped or unhandled, this will be propagated. + * + * @param behavior The [Behavior] to map. + * @return Either the specified [Behavior] or the propagated special objects. + */ + fun propagate(behavior: Behavior): Behavior = + if (this.behavior.isUnhandled || !this.behavior.isAlive) + this.behavior + else + behavior + + /** + * Interpret the given message of type [T] using the current active behavior. + * + * @return `true` if the message was handled by the active behavior, `false` otherwise. + */ + fun interpretMessage(ctx: ActorContext, msg: T): Boolean = interpret(ctx, msg, false) + + /** + * Interpret the given [Signal] using the current active behavior. + * + * @return `true` if the signal was handled by the active behavior, `false` otherwise. + */ + fun interpretSignal(ctx: ActorContext, signal: Signal): Boolean = interpret(ctx, signal, true) + + /** + * Interpret the given message or signal using the current active behavior. + * + * @return `true` if the message or signal was handled by the active behavior, `false` otherwise. + */ + private fun interpret(ctx: ActorContext, msg: Any, isSignal: Boolean): Boolean = + if (isAlive) { + val next = when (val current = behavior) { + is DeferredBehavior -> + throw IllegalStateException("Deferred [$current] should not be passed to interpreter") + is ReceivingBehavior -> + if (isSignal) + current.receiveSignal(ctx, msg as Signal) + else + @Suppress("UNCHECKED_CAST") + current.receive(ctx, msg as T) + is SameBehavior, is UnhandledBehavior -> + throw IllegalStateException("Cannot execute with [$current] as behavior") + is StoppedBehavior -> current + } + + val unhandled = next.isUnhandled + behavior = canonicalize(ctx, behavior, next) + !unhandled + } else { + false + } + + /** + * Validate whether the given [Behavior] can be used as initial behavior. Throw an [IllegalArgumentException] if + * the [Behavior] is not valid. + * + * @param behavior The behavior to validate. + */ + private fun validateAsInitial(behavior: Behavior): Behavior = + when (behavior) { + is SameBehavior, is UnhandledBehavior -> + throw IllegalArgumentException("Cannot use [$behavior] as initial behavior") + else -> behavior + } + + /** + * Helper methods to properly manage the special, canned behavior objects. It highly recommended to use the + * [BehaviorInterpreter] instead to properly manage the life-cycles of the behavior objects. + */ + companion object { + /** + * Start the initial behavior of an actor in the specified [ActorContext]. + * + * This will activate the initial behavior and canonicalize the resulting behavior. + * + * @param ctx The [ActorContext] to start the behavior in. + * @param behavior The initial behavior to start. + * @return The behavior that has been started. + */ + tailrec fun start(ctx: ActorContext, behavior: Behavior): Behavior = + when (behavior) { + is DeferredBehavior -> start(ctx, behavior(ctx)) + else -> behavior + } + + /** + * Given a possibly special behavior (same or unhandled) and a "current" behavior (which defines the meaning of + * encountering a `same` behavior) this method computes the next behavior, suitable for passing a message or + * signal. + * + * @param ctx The context in which the actor runs. + * @param current The actor's current behavior. + * @param next The actor's next behavior. + * @return The actor's canonicalized next behavior. + */ + tailrec fun canonicalize(ctx: ActorContext, current: Behavior, next: Behavior): Behavior = + when (next) { + is SameBehavior, current -> current + is UnhandledBehavior -> current + is DeferredBehavior -> canonicalize(ctx, current, next(ctx)) + else -> next + } + } +} diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt new file mode 100644 index 00000000..d8c3e8fc --- /dev/null +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt @@ -0,0 +1,169 @@ +/* + * MIT License + * + * Copyright (c) 2018 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim.internal + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.Instant +import com.atlarge.odcsim.ReceivingBehavior +import com.atlarge.odcsim.Signal +import com.atlarge.odcsim.coroutines.SuspendingActorContext +import com.atlarge.odcsim.coroutines.SuspendingBehavior +import com.atlarge.odcsim.dsl.receiveMessage +import com.atlarge.odcsim.dsl.receiveSignal +import com.atlarge.odcsim.coroutines.suspendWithBehavior +import kotlin.coroutines.Continuation +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.resume +import kotlin.coroutines.startCoroutine + +/** + * The interface that is exposed from the [CoroutineContext] and provides control over the [SuspendingBehavior] + * instance. + */ +interface SuspendingBehaviorContext : CoroutineContext.Element { + /** + * The [ActorContext] in which the actor currently runs. + */ + val context: SuspendingActorContext + + /** + * The current active behavior + */ + val behavior: Behavior + + /** + * Replace the current active behavior with the specified new behavior. + * + * @param next The behavior to replace the current behavior with. + */ + fun become(next: Behavior) + + /** + * This key provides users access to an untyped actor context in case the coroutine runs inside a + * [SuspendingBehavior]. + */ + companion object Key : CoroutineContext.Key> +} + +/** + * Implementation of [SuspendingBehavior] class that maps the suspending method calls to the [Behavior] + * interface. + * This implementation uses the fact that each actor is thread-safe (as it processes its mailbox sequentially). + */ +internal class SuspendingBehaviorImpl(actorContext: ActorContext, initialBehavior: SuspendingBehavior) : + ReceivingBehavior(), SuspendingActorContext, SuspendingBehaviorContext { + /** + * The next behavior to use. + */ + private var next: Behavior = this + + /** + * The [BehaviorInterpreter] to wrap the suspending behavior. + */ + private val interpreter = BehaviorInterpreter(initialBehavior) + + /** + * The current active [ActorContext]. + */ + private var actorContext: ActorContext + + init { + this.actorContext = actorContext + this.start() + } + + override fun receive(ctx: ActorContext, msg: T): Behavior { + this.actorContext = ctx + return interpreter.also { it.interpretMessage(ctx, msg) }.propagate(next) + } + + override fun receiveSignal(ctx: ActorContext, signal: Signal): Behavior { + this.actorContext = ctx + return interpreter.also { it.interpretSignal(ctx, signal) }.propagate(next) + } + + override val self: ActorRef get() = actorContext.self + + override val time: Instant get() = actorContext.time + + override fun spawn(behavior: Behavior, name: String) = actorContext.spawn(behavior, name) + + override fun stop(child: ActorRef<*>): Boolean = actorContext.stop(child) + + override suspend fun receive(): T = suspendWithBehavior { cont, next -> + receiveMessage { msg -> + cont.resume(msg) + next() + } + } + + override suspend fun receiveSignal(): Signal = suspendWithBehavior { cont, next -> + receiveSignal { _, signal -> + cont.resume(signal) + next() + } + } + + override val context: SuspendingActorContext get() = this + + override val behavior: Behavior get() = interpreter.behavior + + override fun become(next: Behavior) { + interpreter.become(actorContext, next) + } + + override val key: CoroutineContext.Key<*> = SuspendingBehaviorContext.Key + + /** + * Start the suspending behavior. + */ + private fun start() { + val behavior = interpreter.behavior as SuspendingBehavior + val block: suspend () -> Unit = { behavior(this) } + block.startCoroutine(SuspendingBehaviorImplContinuation()) + } + + /** + * Stop the suspending behavior. + */ + private fun stop() { + this.interpreter.stop(actorContext) + } + + /** + * The continuation of suspending behavior. + */ + private inner class SuspendingBehaviorImplContinuation : Continuation { + override val context = this@SuspendingBehaviorImpl + + override fun resumeWith(result: Result) { + // TODO Add some form of error handling here + stop() + next = Behavior.stopped() + } + } +} diff --git a/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt b/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt index 73b060b9..492dc686 100644 --- a/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt +++ b/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt @@ -250,7 +250,6 @@ abstract class ActorSystemTest { Behavior.ignore() }, "child") - Behavior.receive { ctx2, msg -> assertTrue(ctx2.stop(child)) msg.send(Unit) // This actor should be stopped now and not receive the message anymore @@ -332,5 +331,23 @@ abstract class ActorSystemTest { val system = factory(Behavior.stopped(), "test") system.run() } + + /** + * Test whether deferred behavior that returns [Behavior.Companion.same] fails. + */ + @Test + fun `should not allow setup to return same`() { + val system = factory(Behavior.setup { Behavior.same() }, "test") + assertThrows { system.run() } + } + + /** + * Test whether deferred behavior that returns [Behavior.Companion.unhandled] fails. + */ + @Test + fun `should not allow setup to return unhandled`() { + val system = factory(Behavior.setup { Behavior.unhandled() }, "test") + assertThrows { system.run() } + } } } -- cgit v1.2.3