diff options
Diffstat (limited to 'odcsim-core')
10 files changed, 646 insertions, 164 deletions
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt index 87b2f28a..1bc41b44 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt @@ -174,164 +174,25 @@ val <T : Any> Behavior<T>.isAlive get() = this !is StoppedBehavior */ val <T : Any> Behavior<T>.isUnhandled get() = this !is UnhandledBehavior +// The special behaviors are kept in this file as to be able to seal the Behavior class to prevent users from extending +// it. /** * A special [Behavior] instance that signals that the actor has stopped. */ -private object StoppedBehavior : Behavior<Any>() { +internal object StoppedBehavior : Behavior<Any>() { override fun toString() = "Stopped" } /** * A special [Behavior] object to signal that the actor wants to reuse its previous behavior. */ -private object SameBehavior : Behavior<Nothing>() { +internal object SameBehavior : Behavior<Nothing>() { override fun toString() = "Same" } /** * A special [Behavior] object that indicates that the last message or signal was not handled. */ -private object UnhandledBehavior : Behavior<Nothing>() { +internal object UnhandledBehavior : Behavior<Nothing>() { override fun toString() = "Unhandled" } - -/** - * Helper class that interprets messages/signals, canonicalizes special objects and manages the life-cycle of - * [Behavior] instances. - * - * @param initialBehavior The initial behavior to use. - */ -class BehaviorInterpreter<T : Any>(initialBehavior: Behavior<T>) { - /** - * The current [Behavior] instance. - */ - var behavior: Behavior<T> = initialBehavior - - /** - * A flag to indicate whether the current behavior is still alive. - */ - val isAlive: Boolean get() = behavior.isAlive - - /** - * Construct a [BehaviorInterpreter] with the specified initial behavior and immediately start it in the specified - * context. - * - * @param initialBehavior The initial behavior of the actor. - * @param ctx The [ActorContext] to run the behavior in. - */ - constructor(initialBehavior: Behavior<T>, ctx: ActorContext<T>) : this(initialBehavior) { - start(ctx) - } - - /** - * Start the initial behavior. - * - * @param ctx The [ActorContext] to start the behavior in. - */ - fun start(ctx: ActorContext<T>) { - behavior = validateAsInitial(start(behavior, ctx)) - } - - /** - * Stop the current active behavior and move into the stopped state. - * - * @param ctx The [ActorContext] this takes place in. - */ - fun stop(ctx: ActorContext<T>) { - behavior = start(StoppedBehavior.narrow(), ctx) - } - - /** - * Interpret the given message of type [T] using the current active behavior. - * - * @return `true` if the message was handled by the active behavior, `false` otherwise. - */ - fun interpretMessage(ctx: ActorContext<T>, msg: T): Boolean = interpret(ctx, msg, false) - - /** - * Interpret the given [Signal] using the current active behavior. - * - * @return `true` if the signal was handled by the active behavior, `false` otherwise. - */ - fun interpretSignal(ctx: ActorContext<T>, signal: Signal): Boolean = interpret(ctx, signal, true) - - /** - * Interpret the given message or signal using the current active behavior. - * - * @return `true` if the message or signal was handled by the active behavior, `false` otherwise. - */ - private fun interpret(ctx: ActorContext<T>, msg: Any, isSignal: Boolean): Boolean = - if (isAlive) { - val next = when (val current = behavior) { - is DeferredBehavior<T> -> - throw IllegalStateException("Deferred [$current] should not be passed to interpreter") - is ReceivingBehavior<T> -> - if (isSignal) - current.receiveSignal(ctx, msg as Signal) - else - @Suppress("UNCHECKED_CAST") - current.receive(ctx, msg as T) - is SameBehavior, is UnhandledBehavior -> - throw IllegalStateException("Cannot execute with [$current] as behavior") - is StoppedBehavior -> current - } - - val unhandled = next.isUnhandled - behavior = canonicalize(next, behavior, ctx) - !unhandled - } else { - false - } - - /** - * Validate whether the given [Behavior] can be used as initial behavior. Throw an [IllegalArgumentException] if - * the [Behavior] is not valid. - * - * @param behavior The behavior to validate. - */ - private fun validateAsInitial(behavior: Behavior<T>): Behavior<T> = - when (behavior) { - is SameBehavior, is UnhandledBehavior -> - throw IllegalArgumentException("Cannot use [$behavior] as initial behavior") - else -> behavior - } - - /** - * Helper methods to properly manage the special, canned behavior objects. It highly recommended to use the - * [BehaviorInterpreter] instead to properly manage the life-cycles of the behavior objects. - */ - companion object { - /** - * Start the initial behavior of an actor in the specified [ActorContext]. - * - * This will activate the initial behavior and canonicalize the resulting behavior. - * - * @param behavior The initial behavior to start. - * @param ctx The [ActorContext] to start the behavior in. - * @return The behavior that has been started. - */ - tailrec fun <T : Any> start(behavior: Behavior<T>, ctx: ActorContext<T>): Behavior<T> = - when (behavior) { - is DeferredBehavior<T> -> start(behavior(ctx), ctx) - else -> behavior - } - - /** - * Given a possibly special behavior (same or unhandled) and a "current" behavior (which defines the meaning of - * encountering a `same` behavior) this method computes the next behavior, suitable for passing a message or - * signal. - * - * @param behavior The actor's next behavior. - * @param current The actor's current behavior. - * @param ctx The context in which the actor runs. - * @return The actor's canonicalized next behavior. - */ - tailrec fun <T : Any> canonicalize(behavior: Behavior<T>, current: Behavior<T>, ctx: ActorContext<T>): Behavior<T> = - when (behavior) { - is SameBehavior, current -> current - is UnhandledBehavior -> current - is DeferredBehavior<T> -> canonicalize(behavior(ctx), current, ctx) - else -> behavior - } - } -} diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt index 635bf9ac..35aad18b 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt @@ -39,3 +39,13 @@ object PreStart : Signal * Lifecycle signal that is fired after this actor and all its child actors (transitively) have terminated. */ object PostStop : Signal + +/** + * A [Signal] to indicate an actor has timed out. + * + * This class contains a [target] property in order to allow nested behavior to function properly when multiple layers + * are waiting on this signal. + * + * @property target The target object that has timed out. + */ +class Timeout(val target: Any) : Signal diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/Behavior.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/Behavior.kt new file mode 100644 index 00000000..4951d619 --- /dev/null +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/Behavior.kt @@ -0,0 +1,97 @@ +/* + * MIT License + * + * Copyright (c) 2018 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim.coroutines + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.DeferredBehavior +import com.atlarge.odcsim.Signal +import com.atlarge.odcsim.internal.SuspendingBehaviorContext +import com.atlarge.odcsim.internal.SuspendingBehaviorImpl +import kotlin.coroutines.Continuation +import kotlin.coroutines.suspendCoroutine + +/** + * A [Behavior] that allows method calls to suspend execution via Kotlin coroutines. + * + * @param T The shape of the messages the actor accepts. + */ +abstract class SuspendingBehavior<T : Any> : DeferredBehavior<T>() { + /** + * Run the logic of this behavior. + * + * @param ctx The [SuspendingActorContext] in which the behavior is executed. + */ + abstract suspend operator fun invoke(ctx: SuspendingActorContext<T>) + + // Immediately transfer to implementation + override fun invoke(ctx: ActorContext<T>): Behavior<T> = SuspendingBehaviorImpl(ctx, this) +} + +/** + * Construct a [Behavior] that uses Kotlin coroutines functionality to handle incoming messages and signals. + */ +fun <T : Any> Behavior.Companion.suspending(block: suspend (SuspendingActorContext<T>) -> Unit): Behavior<T> { + return object : SuspendingBehavior<T>() { + override suspend fun invoke(ctx: SuspendingActorContext<T>) = block(ctx) + } +} + +/** + * An [ActorContext] that provides additional functionality for receiving messages and signals from + * the actor's mailbox. + * + * @param T The shape of the messages the actor accepts. + */ +interface SuspendingActorContext<T : Any> : ActorContext<T> { + /** + * Suspend execution of the active coroutine to wait for a message of type [T] to be received in the actor's + * mailbox. During suspension, incoming signals will be marked unhandled. + * + * @return The message of type [T] that has been received. + */ + suspend fun receive(): T + + /** + * Suspend execution of the active coroutine to wait for a [Signal] to be received in the actor's mailbox. + * During suspension, incoming messages will be marked unhandled. + * + * @return The [Signal] that has been received. + */ + suspend fun receiveSignal(): Signal +} + +/** + * Obtains the current continuation instance inside suspend functions and suspends currently running coroutine. [block] + * should return a [Behavior] that will resume the continuation and return the next behavior which is supplied via the + * second argument of the block. + */ +suspend fun <T : Any, U> suspendWithBehavior(block: (Continuation<U>, () -> Behavior<T>) -> Behavior<T>): U = + suspendCoroutine { cont -> + @Suppress("UNCHECKED_CAST") + val ctx = cont.context[SuspendingBehaviorContext] as? SuspendingBehaviorContext<T> + ?: throw UnsupportedOperationException("Coroutine does not run inside SuspendingBehavior") + ctx.become(block(cont) { ctx.behavior }) + } diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt new file mode 100644 index 00000000..832045a4 --- /dev/null +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt @@ -0,0 +1,58 @@ +/* + * MIT License + * + * Copyright (c) 2018 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim.coroutines.dsl + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.Duration +import com.atlarge.odcsim.ReceivingBehavior +import com.atlarge.odcsim.Signal +import com.atlarge.odcsim.Timeout +import com.atlarge.odcsim.coroutines.SuspendingActorContext +import com.atlarge.odcsim.coroutines.suspendWithBehavior +import com.atlarge.odcsim.internal.send +import kotlin.coroutines.resume + +/** + * Block execution for the specified duration. + * + * @param after The duration after which execution should continue. + */ +suspend fun <T : Any> SuspendingActorContext<T>.timeout(after: Duration) = + suspendWithBehavior<T, Unit> { cont, next -> + object : ReceivingBehavior<T>() { + init { + self.send(Timeout(this), after) + } + + override fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> = + if (signal is Timeout && signal.target == this) { + cont.resume(Unit) + next() + } else { + unhandled() + } + } + } diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt index 590595d3..d4b092d2 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt @@ -26,10 +26,12 @@ package com.atlarge.odcsim.dsl import com.atlarge.odcsim.ActorContext import com.atlarge.odcsim.Behavior -import com.atlarge.odcsim.BehaviorInterpreter import com.atlarge.odcsim.DeferredBehavior import com.atlarge.odcsim.ReceivingBehavior import com.atlarge.odcsim.Signal +import com.atlarge.odcsim.internal.BehaviorInterpreter +import com.atlarge.odcsim.internal.EmptyBehavior +import com.atlarge.odcsim.internal.IgnoreBehavior /** * A factory for [Behavior]. Creation of the behavior instance is deferred until the actor is started. @@ -46,29 +48,11 @@ fun <T : Any> Behavior.Companion.setup(block: (ActorContext<T>) -> Behavior<T>): fun <T : Any> Behavior.Companion.ignore(): Behavior<T> = IgnoreBehavior.narrow() /** - * A [Behavior] object that ignores all messages sent to the actor. - */ -private object IgnoreBehavior : ReceivingBehavior<Any>() { - override fun receive(ctx: ActorContext<Any>, msg: Any): Behavior<Any> = this - - override fun receiveSignal(ctx: ActorContext<Any>, signal: Signal): Behavior<Any> = this - - override fun toString() = "Ignore" -} - -/** * A [Behavior] that treats every incoming message or signal as unhandled. */ fun <T : Any> Behavior.Companion.empty(): Behavior<T> = EmptyBehavior.narrow() /** - * A [Behavior] object that does not handle any message it receives. - */ -private object EmptyBehavior : ReceivingBehavior<Any>() { - override fun toString() = "Empty" -} - -/** * Construct a [Behavior] that reacts to incoming messages, provides access to the [ActorContext] and returns the * actor's next behavior. */ diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorRef.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorRef.kt new file mode 100644 index 00000000..7653431c --- /dev/null +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorRef.kt @@ -0,0 +1,37 @@ +/* + * MIT License + * + * Copyright (c) 2018 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim.internal + +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Duration +import com.atlarge.odcsim.Signal + +/** + * Send the specified system [Signal] to the given [ActorRef]. + */ +fun <T : Any> ActorRef<T>.send(signal: Signal, duration: Duration) { + @Suppress("UNCHECKED_CAST") + send(signal as T, duration) +} diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Behavior.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Behavior.kt new file mode 100644 index 00000000..b07cabc0 --- /dev/null +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Behavior.kt @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2018 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim.internal + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.ReceivingBehavior +import com.atlarge.odcsim.Signal + +/** + * A [Behavior] object that ignores all messages sent to the actor. + */ +internal object IgnoreBehavior : ReceivingBehavior<Any>() { + override fun receive(ctx: ActorContext<Any>, msg: Any): Behavior<Any> = this + + override fun receiveSignal(ctx: ActorContext<Any>, signal: Signal): Behavior<Any> = this + + override fun toString() = "Ignore" +} + +/** + * A [Behavior] object that does not handle any message it receives. + */ +internal object EmptyBehavior : ReceivingBehavior<Any>() { + override fun toString() = "Empty" +} diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/BehaviorInterpreter.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/BehaviorInterpreter.kt new file mode 100644 index 00000000..194c2a62 --- /dev/null +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/BehaviorInterpreter.kt @@ -0,0 +1,201 @@ +/* + * MIT License + * + * Copyright (c) 2018 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim.internal + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.DeferredBehavior +import com.atlarge.odcsim.ReceivingBehavior +import com.atlarge.odcsim.SameBehavior +import com.atlarge.odcsim.Signal +import com.atlarge.odcsim.StoppedBehavior +import com.atlarge.odcsim.UnhandledBehavior +import com.atlarge.odcsim.isAlive +import com.atlarge.odcsim.isUnhandled + +/** + * Helper class that interprets messages/signals, canonicalizes special objects and manages the life-cycle of + * [Behavior] instances. + * + * @param initialBehavior The initial behavior to use. + */ +class BehaviorInterpreter<T : Any>(initialBehavior: Behavior<T>) { + /** + * The current [Behavior] instance. + */ + var behavior: Behavior<T> = initialBehavior + private set + + /** + * A flag to indicate the interpreter is still alive. + */ + val isAlive: Boolean get() = behavior.isAlive + + /** + * Construct a [BehaviorInterpreter] with the specified initial behavior and immediately start it in the specified + * context. + * + * @param initialBehavior The initial behavior of the actor. + * @param ctx The [ActorContext] to run the behavior in. + */ + constructor(initialBehavior: Behavior<T>, ctx: ActorContext<T>) : this(initialBehavior) { + start(ctx) + } + + /** + * Start the initial behavior. + * + * @param ctx The [ActorContext] to start the behavior in. + */ + fun start(ctx: ActorContext<T>) { + behavior = validateAsInitial(start(ctx, behavior)) + } + + /** + * Stop the current active behavior and move into the stopped state. + * + * @param ctx The [ActorContext] this takes place in. + */ + fun stop(ctx: ActorContext<T>) { + behavior = start(ctx, StoppedBehavior.narrow()) + } + + /** + * Replace the current behavior with the specified new behavior. + * + * @param ctx The [ActorContext] to run the behavior in. + * @param next The behavior to replace the current behavior with. + */ + fun become(ctx: ActorContext<T>, next: Behavior<T>) { + this.behavior = canonicalize(ctx, behavior, next) + } + + /** + * Propagate special states of the wrapper [Behavior] to the specified [Behavior]. This means + * that if the behavior of this interpreter is stopped or unhandled, this will be propagated. + * + * @param behavior The [Behavior] to map. + * @return Either the specified [Behavior] or the propagated special objects. + */ + fun propagate(behavior: Behavior<T>): Behavior<T> = + if (this.behavior.isUnhandled || !this.behavior.isAlive) + this.behavior + else + behavior + + /** + * Interpret the given message of type [T] using the current active behavior. + * + * @return `true` if the message was handled by the active behavior, `false` otherwise. + */ + fun interpretMessage(ctx: ActorContext<T>, msg: T): Boolean = interpret(ctx, msg, false) + + /** + * Interpret the given [Signal] using the current active behavior. + * + * @return `true` if the signal was handled by the active behavior, `false` otherwise. + */ + fun interpretSignal(ctx: ActorContext<T>, signal: Signal): Boolean = interpret(ctx, signal, true) + + /** + * Interpret the given message or signal using the current active behavior. + * + * @return `true` if the message or signal was handled by the active behavior, `false` otherwise. + */ + private fun interpret(ctx: ActorContext<T>, msg: Any, isSignal: Boolean): Boolean = + if (isAlive) { + val next = when (val current = behavior) { + is DeferredBehavior<T> -> + throw IllegalStateException("Deferred [$current] should not be passed to interpreter") + is ReceivingBehavior<T> -> + if (isSignal) + current.receiveSignal(ctx, msg as Signal) + else + @Suppress("UNCHECKED_CAST") + current.receive(ctx, msg as T) + is SameBehavior, is UnhandledBehavior -> + throw IllegalStateException("Cannot execute with [$current] as behavior") + is StoppedBehavior -> current + } + + val unhandled = next.isUnhandled + behavior = canonicalize(ctx, behavior, next) + !unhandled + } else { + false + } + + /** + * Validate whether the given [Behavior] can be used as initial behavior. Throw an [IllegalArgumentException] if + * the [Behavior] is not valid. + * + * @param behavior The behavior to validate. + */ + private fun validateAsInitial(behavior: Behavior<T>): Behavior<T> = + when (behavior) { + is SameBehavior, is UnhandledBehavior -> + throw IllegalArgumentException("Cannot use [$behavior] as initial behavior") + else -> behavior + } + + /** + * Helper methods to properly manage the special, canned behavior objects. It highly recommended to use the + * [BehaviorInterpreter] instead to properly manage the life-cycles of the behavior objects. + */ + companion object { + /** + * Start the initial behavior of an actor in the specified [ActorContext]. + * + * This will activate the initial behavior and canonicalize the resulting behavior. + * + * @param ctx The [ActorContext] to start the behavior in. + * @param behavior The initial behavior to start. + * @return The behavior that has been started. + */ + tailrec fun <T : Any> start(ctx: ActorContext<T>, behavior: Behavior<T>): Behavior<T> = + when (behavior) { + is DeferredBehavior<T> -> start(ctx, behavior(ctx)) + else -> behavior + } + + /** + * Given a possibly special behavior (same or unhandled) and a "current" behavior (which defines the meaning of + * encountering a `same` behavior) this method computes the next behavior, suitable for passing a message or + * signal. + * + * @param ctx The context in which the actor runs. + * @param current The actor's current behavior. + * @param next The actor's next behavior. + * @return The actor's canonicalized next behavior. + */ + tailrec fun <T : Any> canonicalize(ctx: ActorContext<T>, current: Behavior<T>, next: Behavior<T>): Behavior<T> = + when (next) { + is SameBehavior, current -> current + is UnhandledBehavior -> current + is DeferredBehavior<T> -> canonicalize(ctx, current, next(ctx)) + else -> next + } + } +} diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt new file mode 100644 index 00000000..d8c3e8fc --- /dev/null +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt @@ -0,0 +1,169 @@ +/* + * MIT License + * + * Copyright (c) 2018 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim.internal + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.Instant +import com.atlarge.odcsim.ReceivingBehavior +import com.atlarge.odcsim.Signal +import com.atlarge.odcsim.coroutines.SuspendingActorContext +import com.atlarge.odcsim.coroutines.SuspendingBehavior +import com.atlarge.odcsim.dsl.receiveMessage +import com.atlarge.odcsim.dsl.receiveSignal +import com.atlarge.odcsim.coroutines.suspendWithBehavior +import kotlin.coroutines.Continuation +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.resume +import kotlin.coroutines.startCoroutine + +/** + * The interface that is exposed from the [CoroutineContext] and provides control over the [SuspendingBehavior] + * instance. + */ +interface SuspendingBehaviorContext<T : Any> : CoroutineContext.Element { + /** + * The [ActorContext] in which the actor currently runs. + */ + val context: SuspendingActorContext<T> + + /** + * The current active behavior + */ + val behavior: Behavior<T> + + /** + * Replace the current active behavior with the specified new behavior. + * + * @param next The behavior to replace the current behavior with. + */ + fun become(next: Behavior<T>) + + /** + * This key provides users access to an untyped actor context in case the coroutine runs inside a + * [SuspendingBehavior]. + */ + companion object Key : CoroutineContext.Key<SuspendingBehaviorContext<*>> +} + +/** + * Implementation of [SuspendingBehavior] class that maps the suspending method calls to the [Behavior] + * interface. + * This implementation uses the fact that each actor is thread-safe (as it processes its mailbox sequentially). + */ +internal class SuspendingBehaviorImpl<T : Any>(actorContext: ActorContext<T>, initialBehavior: SuspendingBehavior<T>) : + ReceivingBehavior<T>(), SuspendingActorContext<T>, SuspendingBehaviorContext<T> { + /** + * The next behavior to use. + */ + private var next: Behavior<T> = this + + /** + * The [BehaviorInterpreter] to wrap the suspending behavior. + */ + private val interpreter = BehaviorInterpreter(initialBehavior) + + /** + * The current active [ActorContext]. + */ + private var actorContext: ActorContext<T> + + init { + this.actorContext = actorContext + this.start() + } + + override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> { + this.actorContext = ctx + return interpreter.also { it.interpretMessage(ctx, msg) }.propagate(next) + } + + override fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> { + this.actorContext = ctx + return interpreter.also { it.interpretSignal(ctx, signal) }.propagate(next) + } + + override val self: ActorRef<T> get() = actorContext.self + + override val time: Instant get() = actorContext.time + + override fun <U : Any> spawn(behavior: Behavior<U>, name: String) = actorContext.spawn(behavior, name) + + override fun stop(child: ActorRef<*>): Boolean = actorContext.stop(child) + + override suspend fun receive(): T = suspendWithBehavior<T, T> { cont, next -> + receiveMessage { msg -> + cont.resume(msg) + next() + } + } + + override suspend fun receiveSignal(): Signal = suspendWithBehavior<T, Signal> { cont, next -> + receiveSignal { _, signal -> + cont.resume(signal) + next() + } + } + + override val context: SuspendingActorContext<T> get() = this + + override val behavior: Behavior<T> get() = interpreter.behavior + + override fun become(next: Behavior<T>) { + interpreter.become(actorContext, next) + } + + override val key: CoroutineContext.Key<*> = SuspendingBehaviorContext.Key + + /** + * Start the suspending behavior. + */ + private fun start() { + val behavior = interpreter.behavior as SuspendingBehavior<T> + val block: suspend () -> Unit = { behavior(this) } + block.startCoroutine(SuspendingBehaviorImplContinuation()) + } + + /** + * Stop the suspending behavior. + */ + private fun stop() { + this.interpreter.stop(actorContext) + } + + /** + * The continuation of suspending behavior. + */ + private inner class SuspendingBehaviorImplContinuation : Continuation<Unit> { + override val context = this@SuspendingBehaviorImpl + + override fun resumeWith(result: Result<Unit>) { + // TODO Add some form of error handling here + stop() + next = Behavior.stopped() + } + } +} diff --git a/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt b/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt index 73b060b9..492dc686 100644 --- a/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt +++ b/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt @@ -250,7 +250,6 @@ abstract class ActorSystemTest { Behavior.ignore() }, "child") - Behavior.receive { ctx2, msg -> assertTrue(ctx2.stop(child)) msg.send(Unit) // This actor should be stopped now and not receive the message anymore @@ -332,5 +331,23 @@ abstract class ActorSystemTest { val system = factory(Behavior.stopped<Unit>(), "test") system.run() } + + /** + * Test whether deferred behavior that returns [Behavior.Companion.same] fails. + */ + @Test + fun `should not allow setup to return same`() { + val system = factory(Behavior.setup<Unit> { Behavior.same() }, "test") + assertThrows<IllegalArgumentException> { system.run() } + } + + /** + * Test whether deferred behavior that returns [Behavior.Companion.unhandled] fails. + */ + @Test + fun `should not allow setup to return unhandled`() { + val system = factory(Behavior.setup<Unit> { Behavior.unhandled() }, "test") + assertThrows<IllegalArgumentException> { system.run() } + } } } |
