diff options
Diffstat (limited to 'odcsim-core/src')
14 files changed, 344 insertions, 222 deletions
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt index deccedd1..d5edd93f 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt @@ -41,6 +41,20 @@ interface ActorContext<T : Any> { val time: Instant /** + * Send the specified message to the actor referenced by this [ActorRef]. + * + * Please note that callees must guarantee that messages are sent strictly in increasing time. + * If so, this method guarantees that: + * - A message will never be received earlier than specified + * - A message might arrive later than specified if the two actors are not synchronized. + * + * @param ref The actor to send the message to. + * @param msg The message to send to the referenced actor. + * @param after The delay after which the message should be received by the actor. + */ + fun <U : Any> send(ref: ActorRef<U>, msg: U, after: Duration = 0.0) + + /** * Spawn a child actor from the given [Behavior] and with the specified name. * * @param behavior The behavior of the child actor to spawn. @@ -94,3 +108,4 @@ interface ActorContext<T : Any> { */ fun isSync(target: ActorRef<*>): Boolean } + diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt index bc81813b..71b10325 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt @@ -32,23 +32,12 @@ interface ActorRef<in T : Any> { * The path for this actor (from this actor up to the root actor). */ val path: ActorPath - - /** - * Send the specified message to the actor referenced by this [ActorRef]. - * - * Please note that callees must guarantee that messages are sent strictly in increasing time. - * If so, this method guarantees that: - * - A message will never be received earlier than specified - * - A message might arrive later than specified if the two actors are not synchronized. - * - * @param msg The message to send to the referenced actor. - * @param after The delay after which the message should be received by the actor. - */ - fun send(msg: T, after: Duration = 0.1) } /** * Unsafe helper method for widening the type accepted by this [ActorRef]. */ -@Suppress("UNCHECKED_CAST") -fun <U : Any, T : U> ActorRef<T>.upcast(): ActorRef<U> = this as ActorRef<U> +fun <U : Any, T : U> ActorRef<T>.upcast(): ActorRef<U> { + @Suppress("UNCHECKED_CAST") + return this as ActorRef<U> +} diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt index a67f13d9..cbb80541 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt @@ -49,4 +49,12 @@ interface ActorSystem<in T : Any> : ActorRef<T> { * @param until The point until which the simulation should run. */ fun run(until: Duration = Duration.POSITIVE_INFINITY) + + /** + * Send the specified message to the root actor of this [ActorSystem]. + * + * @param msg The message to send to the referenced actor. + * @param after The delay after which the message should be received by the actor. + */ + fun send(msg: T, after: Duration = 0.1) } diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt index 1bc41b44..411915ef 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt @@ -24,8 +24,6 @@ package com.atlarge.odcsim -import com.atlarge.odcsim.dsl.wrap - /** * The representation of the behavior of an actor. * @@ -51,35 +49,35 @@ sealed class Behavior<T : Any> { } /** - * Companion object used to bind factory methods to. + * Compose this [Behavior] with a fallback [Behavior] which is used in case this [Behavior] does not handle the + * incoming message or signal. */ - companion object { - /** - * This [Behavior] is used to signal that this actor shall terminate voluntarily. If this actor has created child actors - * then these will be stopped as part of the shutdown procedure. - */ - fun <T : Any> stopped(): Behavior<T> { - @Suppress("UNCHECKED_CAST") - return StoppedBehavior as Behavior<T> - } + fun orElse(behavior: Behavior<T>): Behavior<T> = + wrap(this) { left -> + wrap(behavior) { right -> + object : ReceivingBehavior<T>() { + override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> { + if (left.interpretMessage(ctx, msg)) { + return left.behavior + } else if (right.interpretMessage(ctx, msg)) { + return right.behavior + } + + return unhandled() + } - /** - * This [Behavior] is used to signal that this actor wants to reuse its previous behavior. - */ - fun <T : Any> same(): Behavior<T> { - @Suppress("UNCHECKED_CAST") - return SameBehavior as Behavior<T> - } + override fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> { + if (left.interpretSignal(ctx, signal)) { + return left.behavior + } else if (right.interpretSignal(ctx, signal)) { + return right.behavior + } - /** - * This [Behavior] is used to signal to the system that the last message or signal went unhandled. This will - * reuse the previous behavior. - */ - fun <T : Any> unhandled(): Behavior<T> { - @Suppress("UNCHECKED_CAST") - return UnhandledBehavior as Behavior<T> + return unhandled() + } + } + } } - } } /** @@ -103,68 +101,37 @@ abstract class DeferredBehavior<T : Any> : Behavior<T>() { * The message may either be of the type that the actor declares and which is part of the [ActorRef] signature, * or it may be a system [Signal] that expresses a lifecycle event of either this actor or one of its child actors. * - * @param T The shape of the messages the behavior accepts.* + * @param T The shape of the messages the behavior accepts. */ abstract class ReceivingBehavior<T : Any> : Behavior<T>() { /** * Process an incoming message of type [T] and return the actor's next behavior. * * The returned behavior can in addition to normal behaviors be one of the canned special objects: - * - returning [Behavior.Companion.stopped] will terminate this Behavior - * - returning [Behavior.Companion.same] designates to reuse the current Behavior - * - returning [Behavior.Companion.unhandled] keeps the same Behavior and signals that the message was not yet handled + * - returning [stopped] will terminate this Behavior + * - returning [same] designates to reuse the current Behavior + * - returning [unhandled] keeps the same Behavior and signals that the message was not yet handled * * @param ctx The [ActorContext] in which the actor is currently running. * @param msg The message that was received. */ - open fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> = Behavior.unhandled() + open fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> = unhandled() /** * Process an incoming [Signal] and return the actor's next behavior. * * The returned behavior can in addition to normal behaviors be one of the canned special objects: - * - returning [Behavior.Companion.stopped] will terminate this Behavior - * - returning [Behavior.Companion.same] designates to reuse the current Behavior - * - returning [Behavior.Companion.unhandled] keeps the same Behavior and signals that the message was not yet handled + * - returning [stopped] will terminate this Behavior + * - returning [same] designates to reuse the current Behavior + * - returning [unhandled] keeps the same Behavior and signals that the message was not yet handled * * @param ctx The [ActorContext] in which the actor is currently running. * @param signal The [Signal] that was received. */ - open fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> = Behavior.unhandled() + open fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> = unhandled() } /** - * Compose this [Behavior] with a fallback [Behavior] which is used in case this [Behavior] does not handle the incoming - * message or signal. - */ -fun <T : Any> Behavior<T>.orElse(behavior: Behavior<T>): Behavior<T> = - Behavior.wrap(this) { left -> - Behavior.wrap(behavior) { right -> - object : ReceivingBehavior<T>() { - override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> { - if (left.interpretMessage(ctx, msg)) { - return left.behavior - } else if (right.interpretMessage(ctx, msg)) { - return right.behavior - } - - return Behavior.unhandled() - } - - override fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> { - if (left.interpretSignal(ctx, signal)) { - return left.behavior - } else if (right.interpretSignal(ctx, signal)) { - return right.behavior - } - - return Behavior.unhandled() - } - } - } - } - -/** * A flag to indicate whether a [Behavior] instance is still alive. */ val <T : Any> Behavior<T>.isAlive get() = this !is StoppedBehavior diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt index d4b092d2..4dea1007 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behaviors.kt @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2018 atlarge-research + * Copyright (c) 2019 atlarge-research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -21,22 +21,43 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ +@file:JvmName("Behaviors") +package com.atlarge.odcsim -package com.atlarge.odcsim.dsl - -import com.atlarge.odcsim.ActorContext -import com.atlarge.odcsim.Behavior -import com.atlarge.odcsim.DeferredBehavior -import com.atlarge.odcsim.ReceivingBehavior -import com.atlarge.odcsim.Signal import com.atlarge.odcsim.internal.BehaviorInterpreter import com.atlarge.odcsim.internal.EmptyBehavior import com.atlarge.odcsim.internal.IgnoreBehavior /** + * This [Behavior] is used to signal that this actor shall terminate voluntarily. If this actor has created child actors + * then these will be stopped as part of the shutdown procedure. + */ +fun <T : Any> stopped(): Behavior<T> { + @Suppress("UNCHECKED_CAST") + return StoppedBehavior as Behavior<T> +} + +/** + * This [Behavior] is used to signal that this actor wants to reuse its previous behavior. + */ +fun <T : Any> same(): Behavior<T> { + @Suppress("UNCHECKED_CAST") + return SameBehavior as Behavior<T> +} + +/** + * This [Behavior] is used to signal to the system that the last message or signal went unhandled. This will + * reuse the previous behavior. + */ +fun <T : Any> unhandled(): Behavior<T> { + @Suppress("UNCHECKED_CAST") + return UnhandledBehavior as Behavior<T> +} + +/** * A factory for [Behavior]. Creation of the behavior instance is deferred until the actor is started. */ -fun <T : Any> Behavior.Companion.setup(block: (ActorContext<T>) -> Behavior<T>): Behavior<T> { +fun <T : Any> setup(block: (ActorContext<T>) -> Behavior<T>): Behavior<T> { return object : DeferredBehavior<T>() { override fun invoke(ctx: ActorContext<T>): Behavior<T> = block(ctx) } @@ -45,18 +66,18 @@ fun <T : Any> Behavior.Companion.setup(block: (ActorContext<T>) -> Behavior<T>): /** * A [Behavior] that ignores any incoming message or signal and keeps the same behavior. */ -fun <T : Any> Behavior.Companion.ignore(): Behavior<T> = IgnoreBehavior.narrow() +fun <T : Any> ignore(): Behavior<T> = IgnoreBehavior.narrow() /** * A [Behavior] that treats every incoming message or signal as unhandled. */ -fun <T : Any> Behavior.Companion.empty(): Behavior<T> = EmptyBehavior.narrow() +fun <T : Any> empty(): Behavior<T> = EmptyBehavior.narrow() /** * Construct a [Behavior] that reacts to incoming messages, provides access to the [ActorContext] and returns the * actor's next behavior. */ -fun <T : Any> Behavior.Companion.receive(handler: (ActorContext<T>, T) -> Behavior<T>): Behavior<T> { +fun <T : Any> receive(handler: (ActorContext<T>, T) -> Behavior<T>): Behavior<T> { return object : ReceivingBehavior<T>() { override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> = handler(ctx, msg) } @@ -65,9 +86,9 @@ fun <T : Any> Behavior.Companion.receive(handler: (ActorContext<T>, T) -> Behavi /** * Construct a [Behavior] that reacts to incoming messages and returns the actor's next behavior. */ -fun <T : Any> Behavior.Companion.receiveMessage(onMessage: (T) -> Behavior<T>): Behavior<T> { +fun <T : Any> receiveMessage(handler: (T) -> Behavior<T>): Behavior<T> { return object : ReceivingBehavior<T>() { - override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> = onMessage(msg) + override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> = handler(msg) } } @@ -75,7 +96,7 @@ fun <T : Any> Behavior.Companion.receiveMessage(onMessage: (T) -> Behavior<T>): * Construct a [Behavior] that reacts to incoming signals, provides access to the [ActorContext] and returns the * actor's next behavior. */ -fun <T : Any> Behavior.Companion.receiveSignal(handler: (ActorContext<T>, Signal) -> Behavior<T>): Behavior<T> { +fun <T : Any> receiveSignal(handler: (ActorContext<T>, Signal) -> Behavior<T>): Behavior<T> { return object : ReceivingBehavior<T>() { override fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> = handler(ctx, signal) } @@ -85,6 +106,6 @@ fun <T : Any> Behavior.Companion.receiveSignal(handler: (ActorContext<T>, Signal * Construct a [Behavior] that wraps another behavior instance and uses a [BehaviorInterpreter] to pass incoming * messages and signals to the wrapper behavior. */ -fun <T : Any> Behavior.Companion.wrap(behavior: Behavior<T>, wrap: (BehaviorInterpreter<T>) -> Behavior<T>): Behavior<T> { - return Behavior.setup { ctx -> wrap(BehaviorInterpreter(behavior, ctx)) } +fun <T : Any> wrap(behavior: Behavior<T>, wrap: (BehaviorInterpreter<T>) -> Behavior<T>): Behavior<T> { + return setup { ctx -> wrap(BehaviorInterpreter(behavior, ctx)) } } diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/Behavior.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/Behavior.kt index 20aab232..b963cdb6 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/Behavior.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/Behavior.kt @@ -33,6 +33,7 @@ import com.atlarge.odcsim.internal.SuspendingBehaviorImpl import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext import kotlin.coroutines.suspendCoroutine +import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn /** * A [Behavior] that allows method calls to suspend execution via Kotlin coroutines. @@ -41,23 +42,15 @@ import kotlin.coroutines.suspendCoroutine */ abstract class SuspendingBehavior<T : Any> : DeferredBehavior<T>() { /** - * Run the logic of this behavior. + * Run the suspending logic of this behavior. * * @param ctx The [SuspendingActorContext] in which the behavior is executed. + * @return The next behavior for the actor. */ - abstract suspend operator fun invoke(ctx: SuspendingActorContext<T>) + abstract suspend operator fun invoke(ctx: SuspendingActorContext<T>): Behavior<T> // Immediately transfer to implementation - override fun invoke(ctx: ActorContext<T>): Behavior<T> = SuspendingBehaviorImpl(ctx, this) -} - -/** - * Construct a [Behavior] that uses Kotlin coroutines functionality to handle incoming messages and signals. - */ -fun <T : Any> Behavior.Companion.suspending(block: suspend (SuspendingActorContext<T>) -> Unit): Behavior<T> { - return object : SuspendingBehavior<T>() { - override suspend fun invoke(ctx: SuspendingActorContext<T>) = block(ctx) - } + override fun invoke(ctx: ActorContext<T>): Behavior<T> = SuspendingBehaviorImpl(ctx, this).start() } /** @@ -102,3 +95,22 @@ suspend fun <T : Any, U> suspendWithBehavior(block: (Continuation<U>, () -> Beha ?: throw UnsupportedOperationException("Coroutine does not run inside SuspendingBehavior") ctx.become(block(cont) { ctx.behavior }) } + +/** + * Obtain the current [SuspendingActorContext] instance for the active continuation. + */ +suspend fun <T : Any> actorContext(): SuspendingActorContext<T> = + suspendCoroutineUninterceptedOrReturn { cont -> + @Suppress("UNCHECKED_CAST") + cont.context[SuspendingActorContext] as? SuspendingActorContext<T> + ?: throw UnsupportedOperationException("Coroutine does not run inside SuspendingBehavior") + } + +/** + * Construct a [Behavior] that uses Kotlin coroutines functionality to handle incoming messages and signals. + */ +fun <T : Any> suspending(block: suspend (SuspendingActorContext<T>) -> Behavior<T>): Behavior<T> { + return object : SuspendingBehavior<T>() { + override suspend fun invoke(ctx: SuspendingActorContext<T>) = block(ctx) + } +} diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Receive.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Receive.kt new file mode 100644 index 00000000..7c09734b --- /dev/null +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Receive.kt @@ -0,0 +1,64 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim.coroutines.dsl + +import com.atlarge.odcsim.ActorRef +import com.atlarge.odcsim.Duration +import com.atlarge.odcsim.coroutines.SuspendingActorContext +import com.atlarge.odcsim.coroutines.suspendWithBehavior +import com.atlarge.odcsim.receiveMessage +import com.atlarge.odcsim.unhandled +import kotlin.coroutines.resume + +/** + * Receive only messages of type [U] and mark all other messages as unhandled. + * + * @return The received message. + */ +suspend inline fun <T : Any, reified U : T> SuspendingActorContext<T>.receiveOf(): U = + suspendWithBehavior<T, U> { cont, next -> + receiveMessage { msg -> + if (msg is U) { + cont.resume(msg) + next() + } else { + unhandled() + } + } + } + +/** + * Send the specified message to the given reference and wait for a reply. + * + * @param ref The actor to send the message to. + * @param after The delay after which the message should be received by the actor. + * @param transform The block to transform `self` to a message. + */ +suspend inline fun <T : Any, U : Any, reified V : T> SuspendingActorContext<T>.ask(ref: ActorRef<U>, + after: Duration = 0.0, + transform: (ActorRef<T>) -> U): V { + send(ref, transform(self), after) + return receiveOf() +} diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt index 832045a4..45cbce69 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt @@ -24,15 +24,13 @@ package com.atlarge.odcsim.coroutines.dsl -import com.atlarge.odcsim.ActorContext -import com.atlarge.odcsim.Behavior import com.atlarge.odcsim.Duration -import com.atlarge.odcsim.ReceivingBehavior -import com.atlarge.odcsim.Signal import com.atlarge.odcsim.Timeout import com.atlarge.odcsim.coroutines.SuspendingActorContext import com.atlarge.odcsim.coroutines.suspendWithBehavior -import com.atlarge.odcsim.internal.send +import com.atlarge.odcsim.setup +import com.atlarge.odcsim.receiveSignal +import com.atlarge.odcsim.unhandled import kotlin.coroutines.resume /** @@ -42,17 +40,17 @@ import kotlin.coroutines.resume */ suspend fun <T : Any> SuspendingActorContext<T>.timeout(after: Duration) = suspendWithBehavior<T, Unit> { cont, next -> - object : ReceivingBehavior<T>() { - init { - self.send(Timeout(this), after) - } - - override fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> = - if (signal is Timeout && signal.target == this) { + setup { ctx -> + val target = this + @Suppress("UNCHECKED_CAST") + ctx.send(ctx.self, Timeout(target) as T, after) + receiveSignal { _, signal -> + if (signal is Timeout && signal.target == target) { cont.resume(Unit) next() } else { unhandled() } + } } } diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorRef.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorContext.kt index 7653431c..f1aba25e 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorRef.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorContext.kt @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2018 atlarge-research + * Copyright (c) 2019 atlarge-research * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,14 +24,20 @@ package com.atlarge.odcsim.internal +import com.atlarge.odcsim.ActorContext import com.atlarge.odcsim.ActorRef import com.atlarge.odcsim.Duration import com.atlarge.odcsim.Signal /** - * Send the specified system [Signal] to the given [ActorRef]. + * Send the specified [Signal] to the given actor reference after the specified duration. + * + * @param ref The actor to send the signal to. + * @param signal The signal to send to the referenced actor. + * @param after The delay after which the signal should be received by the actor. */ -fun <T : Any> ActorRef<T>.send(signal: Signal, duration: Duration) { +fun ActorContext<*>.sendSignal(ref: ActorRef<*>, signal: Signal, after: Duration = 0.0) { + // Signals are currently processed as regular messages @Suppress("UNCHECKED_CAST") - send(signal as T, duration) + send(ref as ActorRef<Any>, signal, after) } diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt index 7f2d39bc..70294ae5 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt @@ -27,14 +27,15 @@ package com.atlarge.odcsim.internal import com.atlarge.odcsim.ActorContext import com.atlarge.odcsim.ActorRef import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.Duration import com.atlarge.odcsim.Instant import com.atlarge.odcsim.ReceivingBehavior import com.atlarge.odcsim.Signal import com.atlarge.odcsim.coroutines.SuspendingActorContext import com.atlarge.odcsim.coroutines.SuspendingBehavior -import com.atlarge.odcsim.dsl.receiveMessage -import com.atlarge.odcsim.dsl.receiveSignal +import com.atlarge.odcsim.receiveSignal import com.atlarge.odcsim.coroutines.suspendWithBehavior +import com.atlarge.odcsim.receiveMessage import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext import kotlin.coroutines.resume @@ -63,8 +64,10 @@ interface SuspendingActorContextImpl<T : Any> : SuspendingActorContext<T> { * interface. * This implementation uses the fact that each actor is thread-safe (as it processes its mailbox sequentially). */ -internal class SuspendingBehaviorImpl<T : Any>(actorContext: ActorContext<T>, initialBehavior: SuspendingBehavior<T>) : - ReceivingBehavior<T>(), SuspendingActorContextImpl<T> { +internal class SuspendingBehaviorImpl<T : Any>( + private var actorContext: ActorContext<T>, + initialBehavior: SuspendingBehavior<T>) : ReceivingBehavior<T>(), SuspendingActorContextImpl<T> { + /** * The next behavior to use. */ @@ -75,16 +78,6 @@ internal class SuspendingBehaviorImpl<T : Any>(actorContext: ActorContext<T>, in */ private val interpreter = BehaviorInterpreter(initialBehavior) - /** - * The current active [ActorContext]. - */ - private var actorContext: ActorContext<T> - - init { - this.actorContext = actorContext - this.start() - } - override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> { this.actorContext = ctx return interpreter.also { it.interpretMessage(ctx, msg) }.propagate(next) @@ -99,10 +92,18 @@ internal class SuspendingBehaviorImpl<T : Any>(actorContext: ActorContext<T>, in override val time: Instant get() = actorContext.time + override fun <U : Any> send(ref: ActorRef<U>, msg: U, after: Duration) = actorContext.send(ref, msg, after) + override fun <U : Any> spawn(behavior: Behavior<U>, name: String) = actorContext.spawn(behavior, name) override fun stop(child: ActorRef<*>): Boolean = actorContext.stop(child) + override fun sync(target: ActorRef<*>) = actorContext.sync(target) + + override fun unsync(target: ActorRef<*>) = actorContext.unsync(target) + + override fun isSync(target: ActorRef<*>): Boolean = actorContext.isSync(target) + override suspend fun receive(): T = suspendWithBehavior<T, T> { cont, next -> receiveMessage { msg -> cont.resume(msg) @@ -128,10 +129,11 @@ internal class SuspendingBehaviorImpl<T : Any>(actorContext: ActorContext<T>, in /** * Start the suspending behavior. */ - private fun start() { + internal fun start(): Behavior<T> { val behavior = interpreter.behavior as SuspendingBehavior<T> - val block: suspend () -> Unit = { behavior(this) } + val block: suspend () -> Behavior<T> = { behavior(this) } block.startCoroutine(SuspendingBehaviorImplContinuation()) + return next } /** @@ -144,13 +146,15 @@ internal class SuspendingBehaviorImpl<T : Any>(actorContext: ActorContext<T>, in /** * The continuation of suspending behavior. */ - private inner class SuspendingBehaviorImplContinuation : Continuation<Unit> { + private inner class SuspendingBehaviorImplContinuation : Continuation<Behavior<T>> { override val context = this@SuspendingBehaviorImpl - override fun resumeWith(result: Result<Unit>) { - // TODO Add some form of error handling here - stop() - next = Behavior.stopped() + override fun resumeWith(result: Result<Behavior<T>>) { + if (result.isSuccess) { + next = result.getOrNull()!! + } else if (result.isFailure) { + throw result.exceptionOrNull()!! + } } } } diff --git a/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemFactoryTest.kt b/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemFactoryTest.kt index 3830f09e..11652606 100644 --- a/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemFactoryTest.kt +++ b/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemFactoryTest.kt @@ -24,8 +24,6 @@ package com.atlarge.odcsim -import com.atlarge.odcsim.dsl.empty -import com.atlarge.odcsim.dsl.setup import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows @@ -46,7 +44,7 @@ abstract class ActorSystemFactoryTest { fun `should create a system with correct name`() { val factory = createFactory() val name = "test" - val system = factory(Behavior.empty<Unit>(), name) + val system = factory(empty<Unit>(), name) assertEquals(name, system.name) } @@ -57,7 +55,7 @@ abstract class ActorSystemFactoryTest { @Test fun `should create a system with correct root behavior`() { val factory = createFactory() - val system = factory(Behavior.setup<Unit> { throw UnsupportedOperationException() }, "test") + val system = factory(setup<Unit> { throw UnsupportedOperationException() }, "test") assertThrows<UnsupportedOperationException> { system.run(until = 10.0) } } diff --git a/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt b/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt index 3385767b..e9cc3886 100644 --- a/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt +++ b/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt @@ -24,12 +24,6 @@ package com.atlarge.odcsim -import com.atlarge.odcsim.Behavior.Companion.same -import com.atlarge.odcsim.dsl.empty -import com.atlarge.odcsim.dsl.ignore -import com.atlarge.odcsim.dsl.receive -import com.atlarge.odcsim.dsl.receiveMessage -import com.atlarge.odcsim.dsl.setup import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertFalse import org.junit.jupiter.api.Assertions.assertTrue @@ -55,7 +49,7 @@ abstract class ActorSystemTest { @Test fun `should have a name`() { val name = "test" - val system = factory(Behavior.empty<Unit>(), name) + val system = factory(empty<Unit>(), name) assertEquals(name, system.name) } @@ -65,7 +59,7 @@ abstract class ActorSystemTest { */ @Test fun `should have a path`() { - val system = factory(Behavior.empty<Unit>(), "test") + val system = factory(empty<Unit>(), "test") assertTrue(system.path is ActorPath.Root) } @@ -75,7 +69,7 @@ abstract class ActorSystemTest { */ @Test fun `should start at t=0`() { - val system = factory(Behavior.empty<Unit>(), name = "test") + val system = factory(empty<Unit>(), name = "test") assertTrue(Math.abs(system.time) < DELTA) } @@ -85,7 +79,7 @@ abstract class ActorSystemTest { */ @Test fun `should not accept negative instants for running`() { - val system = factory(Behavior.empty<Unit>(), name = "test") + val system = factory(empty<Unit>(), name = "test") assertThrows<IllegalArgumentException> { system.run(-10.0) } } @@ -96,7 +90,7 @@ abstract class ActorSystemTest { @Test fun `should not jump backward in time`() { val until = 10.0 - val system = factory(Behavior.empty<Unit>(), name = "test") + val system = factory(empty<Unit>(), name = "test") system.run(until = until) system.run(until = until - 0.5) @@ -109,7 +103,7 @@ abstract class ActorSystemTest { @Test fun `should jump forward in time`() { val until = 10.0 - val system = factory(Behavior.empty<Unit>(), name = "test") + val system = factory(empty<Unit>(), name = "test") system.run(until = until) assertTrue(Math.abs(system.time - until) < DELTA) @@ -120,12 +114,11 @@ abstract class ActorSystemTest { */ @Test fun `should order messages at the instant by insertion time`() { - val behavior = Behavior.receiveMessage<Int> { msg -> + val behavior = receiveMessage<Int> { msg -> assertEquals(1, msg) - - Behavior.receiveMessage { + receiveMessage { assertEquals(2, it) - Behavior.ignore() + ignore() } } val system = factory(behavior, name = "test") @@ -140,7 +133,7 @@ abstract class ActorSystemTest { @Test fun `should not process messages after deadline`() { var counter = 0 - val behavior = Behavior.receiveMessage<Unit> { _ -> + val behavior = receiveMessage<Unit> { _ -> counter++ same() } @@ -156,7 +149,7 @@ abstract class ActorSystemTest { */ @Test fun `should not initialize root actor if not run`() { - factory(Behavior.setup<Unit> { TODO() }, name = "test") + factory(setup<Unit> { TODO() }, name = "test") } @Nested @@ -167,7 +160,7 @@ abstract class ActorSystemTest { */ @Test fun `should disallow messages in the past`() { - val system = factory(Behavior.empty<Unit>(), name = "test") + val system = factory(empty<Unit>(), name = "test") assertThrows<IllegalArgumentException> { system.send(Unit, after = -1.0) } } } @@ -180,9 +173,9 @@ abstract class ActorSystemTest { */ @Test fun `should pre-start at t=0 if root`() { - val behavior = Behavior.setup<Unit> { ctx -> + val behavior = setup<Unit> { ctx -> assertTrue(Math.abs(ctx.time) < DELTA) - Behavior.ignore() + ignore() } val system = factory(behavior, "test") @@ -195,12 +188,12 @@ abstract class ActorSystemTest { @Test fun `should allow spawning of child actors`() { var spawned = false - val behavior = Behavior.setup<Unit> { spawned = true; Behavior.empty() } + val behavior = setup<Unit> { spawned = true; empty() } - val system = factory(Behavior.setup<Unit> { ctx -> + val system = factory(setup<Unit> { ctx -> val ref = ctx.spawn(behavior, "child") assertEquals("child", ref.path.name) - Behavior.ignore() + ignore() }, name = "test") system.run(until = 10.0) @@ -212,12 +205,12 @@ abstract class ActorSystemTest { */ @Test fun `should allow stopping of child actors`() { - val system = factory(Behavior.setup<Unit> { ctx -> - val ref = ctx.spawn(Behavior.receiveMessage<Unit> { throw UnsupportedOperationException() }, "child") + val system = factory(setup<Unit> { ctx -> + val ref = ctx.spawn(receiveMessage<Unit> { throw UnsupportedOperationException() }, "child") assertTrue(ctx.stop(ref)) assertEquals("child", ref.path.name) - Behavior.ignore() + ignore() }, name = "test") system.run(until = 10.0) @@ -228,15 +221,14 @@ abstract class ActorSystemTest { */ @Test fun `should only be able to terminate child actors`() { - val system = factory(Behavior.setup<Unit> { ctx -> - val child1 = ctx.spawn(Behavior.ignore<Unit>(), "child-1") - - ctx.spawn(Behavior.setup<Unit> { - assertFalse(it.stop(child1)) - Behavior.ignore() + val system = factory(setup<Unit> { ctx1 -> + val child1 = ctx1.spawn(ignore<Unit>(), "child-1") + ctx1.spawn(setup<Unit> { ctx2 -> + assertFalse(ctx2.stop(child1)) + ignore() }, "child-2") - Behavior.ignore() + ignore() }, name = "test") system.run() } @@ -246,11 +238,11 @@ abstract class ActorSystemTest { */ @Test fun `should not be able to stop an already terminated child`() { - val system = factory(Behavior.setup<Unit> { ctx -> - val child = ctx.spawn(Behavior.ignore<Unit>(), "child") + val system = factory(setup<Unit> { ctx -> + val child = ctx.spawn(ignore<Unit>(), "child") ctx.stop(child) assertFalse(ctx.stop(child)) - Behavior.ignore() + ignore() }, name = "test") system.run() } @@ -260,20 +252,20 @@ abstract class ActorSystemTest { */ @Test fun `should terminate children of child when terminating it`() { - val system = factory(Behavior.setup<ActorRef<Unit>> { ctx1 -> - val root = ctx1.self - val child = ctx1.spawn(Behavior.setup<Unit> { - val child = ctx1.spawn(Behavior.receiveMessage<Unit> { + val system = factory(setup<ActorRef<Unit>> { ctx -> + val root = ctx.self + val child = ctx.spawn(setup<Unit> { + val child = ctx.spawn(receiveMessage<Unit> { throw IllegalStateException("DELIBERATE") }, "child") - root.send(child) - Behavior.ignore() + ctx.send(root, child) + ignore() }, "child") - Behavior.receive { ctx2, msg -> - assertTrue(ctx2.stop(child)) - msg.send(Unit) // This actor should be stopped now and not receive the message anymore - Behavior.stopped() + receiveMessage { msg -> + assertTrue(ctx.stop(child)) + ctx.send(msg, Unit) // This actor should be stopped now and not receive the message anymore + stopped() } }, name = "test") @@ -281,19 +273,18 @@ abstract class ActorSystemTest { } /** - * Test whether [Behavior.Companion.same] works correctly. + * Test whether [same] works correctly. */ @Test fun `should keep same behavior on same`() { var counter = 0 - val behavior = Behavior.setup<Unit> { ctx -> - counter++ - ctx.self.send(Unit) - - Behavior.receiveMessage { + val behavior = setup<Unit> { ctx -> + counter++ + ctx.send(ctx.self, Unit) + receiveMessage { counter++ - Behavior.same() + same() } } @@ -308,9 +299,9 @@ abstract class ActorSystemTest { @Test fun `should have reference to itself`() { var flag = false - val behavior: Behavior<Unit> = Behavior.setup { ctx -> - ctx.self.send(Unit) - Behavior.receiveMessage { flag = true; same() } + val behavior: Behavior<Unit> = setup { ctx -> + ctx.send(ctx.self, Unit) + receiveMessage { flag = true; same() } } val system = factory(behavior, "test") @@ -319,20 +310,20 @@ abstract class ActorSystemTest { } /** - * Test whether we cannot start an actor with the [Behavior.Companion.same] behavior. + * Test whether we cannot start an actor with the [same] behavior. */ @Test fun `should not start with same behavior`() { - val system = factory(Behavior.same<Unit>(), "test") + val system = factory(same<Unit>(), "test") assertThrows<IllegalArgumentException> { system.run() } } /** - * Test whether we can start an actor with the [Behavior.Companion.stopped] behavior. + * Test whether we can start an actor with the [stopped] behavior. */ @Test fun `should start with stopped behavior`() { - val system = factory(Behavior.stopped<Unit>(), "test") + val system = factory(stopped<Unit>(), "test") system.run() } @@ -343,7 +334,7 @@ abstract class ActorSystemTest { @Test fun `should stop if it crashes`() { var counter = 0 - val system = factory(Behavior.receiveMessage<Unit> { + val system = factory(receiveMessage<Unit> { counter++ throw IllegalArgumentException("STAGED") }, "test") diff --git a/odcsim-core/src/test/kotlin/com/atlarge/odcsim/BehaviorTest.kt b/odcsim-core/src/test/kotlin/com/atlarge/odcsim/BehaviorTest.kt index 6f97e428..1eb4f3b9 100644 --- a/odcsim-core/src/test/kotlin/com/atlarge/odcsim/BehaviorTest.kt +++ b/odcsim-core/src/test/kotlin/com/atlarge/odcsim/BehaviorTest.kt @@ -24,7 +24,6 @@ package com.atlarge.odcsim -import com.atlarge.odcsim.dsl.setup import com.atlarge.odcsim.internal.BehaviorInterpreter import com.nhaarman.mockitokotlin2.mock import org.junit.jupiter.api.DisplayName @@ -37,12 +36,12 @@ import org.junit.jupiter.api.assertThrows @DisplayName("Behavior") class BehaviorTest { /** - * Test whether we cannot start an actor with the [Behavior.Companion.unhandled] behavior. + * Test whether we cannot start an actor with the [unhandled] behavior. */ @Test fun `should not start with unhandled behavior`() { val ctx = mock<ActorContext<Unit>>() - val interpreter = BehaviorInterpreter(Behavior.unhandled<Unit>()) + val interpreter = BehaviorInterpreter(unhandled<Unit>()) assertThrows<IllegalArgumentException> { interpreter.start(ctx) } } @@ -52,27 +51,27 @@ class BehaviorTest { @Test fun `should not start with deferred unhandled behavior`() { val ctx = mock<ActorContext<Unit>>() - val interpreter = BehaviorInterpreter(Behavior.setup<Unit> { Behavior.unhandled() }) + val interpreter = BehaviorInterpreter(setup<Unit> { unhandled() }) assertThrows<IllegalArgumentException> { interpreter.start(ctx) } } /** - * Test whether deferred behavior that returns [Behavior.Companion.same] fails. + * Test whether deferred behavior that returns [same] fails. */ @Test fun `should not allow setup to return same`() { val ctx = mock<ActorContext<Unit>>() - val interpreter = BehaviorInterpreter(Behavior.setup<Unit> { Behavior.same() }) + val interpreter = BehaviorInterpreter(setup<Unit> { same() }) assertThrows<IllegalArgumentException> { interpreter.start(ctx) } } /** - * Test whether deferred behavior that returns [Behavior.Companion.unhandled] fails. + * Test whether deferred behavior that returns [unhandled] fails. */ @Test fun `should not allow setup to return unhandled`() { val ctx = mock<ActorContext<Unit>>() - val interpreter = BehaviorInterpreter(Behavior.setup<Unit> { Behavior.unhandled() }) + val interpreter = BehaviorInterpreter(setup<Unit> { unhandled() }) assertThrows<IllegalArgumentException> { interpreter.start(ctx) } } } diff --git a/odcsim-core/src/test/kotlin/com/atlarge/odcsim/CoroutinesTest.kt b/odcsim-core/src/test/kotlin/com/atlarge/odcsim/CoroutinesTest.kt new file mode 100644 index 00000000..d6bf6a74 --- /dev/null +++ b/odcsim-core/src/test/kotlin/com/atlarge/odcsim/CoroutinesTest.kt @@ -0,0 +1,50 @@ +/* + * MIT License + * + * Copyright (c) 2019 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim + +import com.atlarge.odcsim.coroutines.suspending +import com.atlarge.odcsim.coroutines.SuspendingBehavior +import com.atlarge.odcsim.internal.BehaviorInterpreter +import com.atlarge.odcsim.internal.EmptyBehavior +import com.nhaarman.mockitokotlin2.mock +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test + +/** + * Test suite for [SuspendingBehavior] using Kotlin Coroutines. + */ +@DisplayName("Coroutines") +internal class CoroutinesTest { + private val ctx = mock<ActorContext<Nothing>>() + + @Test + fun `should immediately return new behavior`() { + val behavior = suspending<Nothing> { empty() } + val interpreter = BehaviorInterpreter(behavior) + interpreter.start(ctx) + assertTrue(interpreter.behavior as Behavior<*> is EmptyBehavior) + } +} |
