summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt149
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt10
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/Behavior.kt97
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt58
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt22
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorRef.kt37
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Behavior.kt48
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/BehaviorInterpreter.kt201
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt169
-rw-r--r--odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt19
-rw-r--r--odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt2
11 files changed, 647 insertions, 165 deletions
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt
index 87b2f28a..1bc41b44 100644
--- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt
@@ -174,164 +174,25 @@ val <T : Any> Behavior<T>.isAlive get() = this !is StoppedBehavior
*/
val <T : Any> Behavior<T>.isUnhandled get() = this !is UnhandledBehavior
+// The special behaviors are kept in this file as to be able to seal the Behavior class to prevent users from extending
+// it.
/**
* A special [Behavior] instance that signals that the actor has stopped.
*/
-private object StoppedBehavior : Behavior<Any>() {
+internal object StoppedBehavior : Behavior<Any>() {
override fun toString() = "Stopped"
}
/**
* A special [Behavior] object to signal that the actor wants to reuse its previous behavior.
*/
-private object SameBehavior : Behavior<Nothing>() {
+internal object SameBehavior : Behavior<Nothing>() {
override fun toString() = "Same"
}
/**
* A special [Behavior] object that indicates that the last message or signal was not handled.
*/
-private object UnhandledBehavior : Behavior<Nothing>() {
+internal object UnhandledBehavior : Behavior<Nothing>() {
override fun toString() = "Unhandled"
}
-
-/**
- * Helper class that interprets messages/signals, canonicalizes special objects and manages the life-cycle of
- * [Behavior] instances.
- *
- * @param initialBehavior The initial behavior to use.
- */
-class BehaviorInterpreter<T : Any>(initialBehavior: Behavior<T>) {
- /**
- * The current [Behavior] instance.
- */
- var behavior: Behavior<T> = initialBehavior
-
- /**
- * A flag to indicate whether the current behavior is still alive.
- */
- val isAlive: Boolean get() = behavior.isAlive
-
- /**
- * Construct a [BehaviorInterpreter] with the specified initial behavior and immediately start it in the specified
- * context.
- *
- * @param initialBehavior The initial behavior of the actor.
- * @param ctx The [ActorContext] to run the behavior in.
- */
- constructor(initialBehavior: Behavior<T>, ctx: ActorContext<T>) : this(initialBehavior) {
- start(ctx)
- }
-
- /**
- * Start the initial behavior.
- *
- * @param ctx The [ActorContext] to start the behavior in.
- */
- fun start(ctx: ActorContext<T>) {
- behavior = validateAsInitial(start(behavior, ctx))
- }
-
- /**
- * Stop the current active behavior and move into the stopped state.
- *
- * @param ctx The [ActorContext] this takes place in.
- */
- fun stop(ctx: ActorContext<T>) {
- behavior = start(StoppedBehavior.narrow(), ctx)
- }
-
- /**
- * Interpret the given message of type [T] using the current active behavior.
- *
- * @return `true` if the message was handled by the active behavior, `false` otherwise.
- */
- fun interpretMessage(ctx: ActorContext<T>, msg: T): Boolean = interpret(ctx, msg, false)
-
- /**
- * Interpret the given [Signal] using the current active behavior.
- *
- * @return `true` if the signal was handled by the active behavior, `false` otherwise.
- */
- fun interpretSignal(ctx: ActorContext<T>, signal: Signal): Boolean = interpret(ctx, signal, true)
-
- /**
- * Interpret the given message or signal using the current active behavior.
- *
- * @return `true` if the message or signal was handled by the active behavior, `false` otherwise.
- */
- private fun interpret(ctx: ActorContext<T>, msg: Any, isSignal: Boolean): Boolean =
- if (isAlive) {
- val next = when (val current = behavior) {
- is DeferredBehavior<T> ->
- throw IllegalStateException("Deferred [$current] should not be passed to interpreter")
- is ReceivingBehavior<T> ->
- if (isSignal)
- current.receiveSignal(ctx, msg as Signal)
- else
- @Suppress("UNCHECKED_CAST")
- current.receive(ctx, msg as T)
- is SameBehavior, is UnhandledBehavior ->
- throw IllegalStateException("Cannot execute with [$current] as behavior")
- is StoppedBehavior -> current
- }
-
- val unhandled = next.isUnhandled
- behavior = canonicalize(next, behavior, ctx)
- !unhandled
- } else {
- false
- }
-
- /**
- * Validate whether the given [Behavior] can be used as initial behavior. Throw an [IllegalArgumentException] if
- * the [Behavior] is not valid.
- *
- * @param behavior The behavior to validate.
- */
- private fun validateAsInitial(behavior: Behavior<T>): Behavior<T> =
- when (behavior) {
- is SameBehavior, is UnhandledBehavior ->
- throw IllegalArgumentException("Cannot use [$behavior] as initial behavior")
- else -> behavior
- }
-
- /**
- * Helper methods to properly manage the special, canned behavior objects. It highly recommended to use the
- * [BehaviorInterpreter] instead to properly manage the life-cycles of the behavior objects.
- */
- companion object {
- /**
- * Start the initial behavior of an actor in the specified [ActorContext].
- *
- * This will activate the initial behavior and canonicalize the resulting behavior.
- *
- * @param behavior The initial behavior to start.
- * @param ctx The [ActorContext] to start the behavior in.
- * @return The behavior that has been started.
- */
- tailrec fun <T : Any> start(behavior: Behavior<T>, ctx: ActorContext<T>): Behavior<T> =
- when (behavior) {
- is DeferredBehavior<T> -> start(behavior(ctx), ctx)
- else -> behavior
- }
-
- /**
- * Given a possibly special behavior (same or unhandled) and a "current" behavior (which defines the meaning of
- * encountering a `same` behavior) this method computes the next behavior, suitable for passing a message or
- * signal.
- *
- * @param behavior The actor's next behavior.
- * @param current The actor's current behavior.
- * @param ctx The context in which the actor runs.
- * @return The actor's canonicalized next behavior.
- */
- tailrec fun <T : Any> canonicalize(behavior: Behavior<T>, current: Behavior<T>, ctx: ActorContext<T>): Behavior<T> =
- when (behavior) {
- is SameBehavior, current -> current
- is UnhandledBehavior -> current
- is DeferredBehavior<T> -> canonicalize(behavior(ctx), current, ctx)
- else -> behavior
- }
- }
-}
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt
index 635bf9ac..35aad18b 100644
--- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt
@@ -39,3 +39,13 @@ object PreStart : Signal
* Lifecycle signal that is fired after this actor and all its child actors (transitively) have terminated.
*/
object PostStop : Signal
+
+/**
+ * A [Signal] to indicate an actor has timed out.
+ *
+ * This class contains a [target] property in order to allow nested behavior to function properly when multiple layers
+ * are waiting on this signal.
+ *
+ * @property target The target object that has timed out.
+ */
+class Timeout(val target: Any) : Signal
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/Behavior.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/Behavior.kt
new file mode 100644
index 00000000..4951d619
--- /dev/null
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/Behavior.kt
@@ -0,0 +1,97 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2018 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.odcsim.coroutines
+
+import com.atlarge.odcsim.ActorContext
+import com.atlarge.odcsim.Behavior
+import com.atlarge.odcsim.DeferredBehavior
+import com.atlarge.odcsim.Signal
+import com.atlarge.odcsim.internal.SuspendingBehaviorContext
+import com.atlarge.odcsim.internal.SuspendingBehaviorImpl
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.suspendCoroutine
+
+/**
+ * A [Behavior] that allows method calls to suspend execution via Kotlin coroutines.
+ *
+ * @param T The shape of the messages the actor accepts.
+ */
+abstract class SuspendingBehavior<T : Any> : DeferredBehavior<T>() {
+ /**
+ * Run the logic of this behavior.
+ *
+ * @param ctx The [SuspendingActorContext] in which the behavior is executed.
+ */
+ abstract suspend operator fun invoke(ctx: SuspendingActorContext<T>)
+
+ // Immediately transfer to implementation
+ override fun invoke(ctx: ActorContext<T>): Behavior<T> = SuspendingBehaviorImpl(ctx, this)
+}
+
+/**
+ * Construct a [Behavior] that uses Kotlin coroutines functionality to handle incoming messages and signals.
+ */
+fun <T : Any> Behavior.Companion.suspending(block: suspend (SuspendingActorContext<T>) -> Unit): Behavior<T> {
+ return object : SuspendingBehavior<T>() {
+ override suspend fun invoke(ctx: SuspendingActorContext<T>) = block(ctx)
+ }
+}
+
+/**
+ * An [ActorContext] that provides additional functionality for receiving messages and signals from
+ * the actor's mailbox.
+ *
+ * @param T The shape of the messages the actor accepts.
+ */
+interface SuspendingActorContext<T : Any> : ActorContext<T> {
+ /**
+ * Suspend execution of the active coroutine to wait for a message of type [T] to be received in the actor's
+ * mailbox. During suspension, incoming signals will be marked unhandled.
+ *
+ * @return The message of type [T] that has been received.
+ */
+ suspend fun receive(): T
+
+ /**
+ * Suspend execution of the active coroutine to wait for a [Signal] to be received in the actor's mailbox.
+ * During suspension, incoming messages will be marked unhandled.
+ *
+ * @return The [Signal] that has been received.
+ */
+ suspend fun receiveSignal(): Signal
+}
+
+/**
+ * Obtains the current continuation instance inside suspend functions and suspends currently running coroutine. [block]
+ * should return a [Behavior] that will resume the continuation and return the next behavior which is supplied via the
+ * second argument of the block.
+ */
+suspend fun <T : Any, U> suspendWithBehavior(block: (Continuation<U>, () -> Behavior<T>) -> Behavior<T>): U =
+ suspendCoroutine { cont ->
+ @Suppress("UNCHECKED_CAST")
+ val ctx = cont.context[SuspendingBehaviorContext] as? SuspendingBehaviorContext<T>
+ ?: throw UnsupportedOperationException("Coroutine does not run inside SuspendingBehavior")
+ ctx.become(block(cont) { ctx.behavior })
+ }
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt
new file mode 100644
index 00000000..832045a4
--- /dev/null
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt
@@ -0,0 +1,58 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2018 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.odcsim.coroutines.dsl
+
+import com.atlarge.odcsim.ActorContext
+import com.atlarge.odcsim.Behavior
+import com.atlarge.odcsim.Duration
+import com.atlarge.odcsim.ReceivingBehavior
+import com.atlarge.odcsim.Signal
+import com.atlarge.odcsim.Timeout
+import com.atlarge.odcsim.coroutines.SuspendingActorContext
+import com.atlarge.odcsim.coroutines.suspendWithBehavior
+import com.atlarge.odcsim.internal.send
+import kotlin.coroutines.resume
+
+/**
+ * Block execution for the specified duration.
+ *
+ * @param after The duration after which execution should continue.
+ */
+suspend fun <T : Any> SuspendingActorContext<T>.timeout(after: Duration) =
+ suspendWithBehavior<T, Unit> { cont, next ->
+ object : ReceivingBehavior<T>() {
+ init {
+ self.send(Timeout(this), after)
+ }
+
+ override fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> =
+ if (signal is Timeout && signal.target == this) {
+ cont.resume(Unit)
+ next()
+ } else {
+ unhandled()
+ }
+ }
+ }
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt
index 590595d3..d4b092d2 100644
--- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt
@@ -26,10 +26,12 @@ package com.atlarge.odcsim.dsl
import com.atlarge.odcsim.ActorContext
import com.atlarge.odcsim.Behavior
-import com.atlarge.odcsim.BehaviorInterpreter
import com.atlarge.odcsim.DeferredBehavior
import com.atlarge.odcsim.ReceivingBehavior
import com.atlarge.odcsim.Signal
+import com.atlarge.odcsim.internal.BehaviorInterpreter
+import com.atlarge.odcsim.internal.EmptyBehavior
+import com.atlarge.odcsim.internal.IgnoreBehavior
/**
* A factory for [Behavior]. Creation of the behavior instance is deferred until the actor is started.
@@ -46,29 +48,11 @@ fun <T : Any> Behavior.Companion.setup(block: (ActorContext<T>) -> Behavior<T>):
fun <T : Any> Behavior.Companion.ignore(): Behavior<T> = IgnoreBehavior.narrow()
/**
- * A [Behavior] object that ignores all messages sent to the actor.
- */
-private object IgnoreBehavior : ReceivingBehavior<Any>() {
- override fun receive(ctx: ActorContext<Any>, msg: Any): Behavior<Any> = this
-
- override fun receiveSignal(ctx: ActorContext<Any>, signal: Signal): Behavior<Any> = this
-
- override fun toString() = "Ignore"
-}
-
-/**
* A [Behavior] that treats every incoming message or signal as unhandled.
*/
fun <T : Any> Behavior.Companion.empty(): Behavior<T> = EmptyBehavior.narrow()
/**
- * A [Behavior] object that does not handle any message it receives.
- */
-private object EmptyBehavior : ReceivingBehavior<Any>() {
- override fun toString() = "Empty"
-}
-
-/**
* Construct a [Behavior] that reacts to incoming messages, provides access to the [ActorContext] and returns the
* actor's next behavior.
*/
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorRef.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorRef.kt
new file mode 100644
index 00000000..7653431c
--- /dev/null
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/ActorRef.kt
@@ -0,0 +1,37 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2018 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.odcsim.internal
+
+import com.atlarge.odcsim.ActorRef
+import com.atlarge.odcsim.Duration
+import com.atlarge.odcsim.Signal
+
+/**
+ * Send the specified system [Signal] to the given [ActorRef].
+ */
+fun <T : Any> ActorRef<T>.send(signal: Signal, duration: Duration) {
+ @Suppress("UNCHECKED_CAST")
+ send(signal as T, duration)
+}
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Behavior.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Behavior.kt
new file mode 100644
index 00000000..b07cabc0
--- /dev/null
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Behavior.kt
@@ -0,0 +1,48 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2018 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.odcsim.internal
+
+import com.atlarge.odcsim.ActorContext
+import com.atlarge.odcsim.Behavior
+import com.atlarge.odcsim.ReceivingBehavior
+import com.atlarge.odcsim.Signal
+
+/**
+ * A [Behavior] object that ignores all messages sent to the actor.
+ */
+internal object IgnoreBehavior : ReceivingBehavior<Any>() {
+ override fun receive(ctx: ActorContext<Any>, msg: Any): Behavior<Any> = this
+
+ override fun receiveSignal(ctx: ActorContext<Any>, signal: Signal): Behavior<Any> = this
+
+ override fun toString() = "Ignore"
+}
+
+/**
+ * A [Behavior] object that does not handle any message it receives.
+ */
+internal object EmptyBehavior : ReceivingBehavior<Any>() {
+ override fun toString() = "Empty"
+}
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/BehaviorInterpreter.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/BehaviorInterpreter.kt
new file mode 100644
index 00000000..194c2a62
--- /dev/null
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/BehaviorInterpreter.kt
@@ -0,0 +1,201 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2018 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.odcsim.internal
+
+import com.atlarge.odcsim.ActorContext
+import com.atlarge.odcsim.Behavior
+import com.atlarge.odcsim.DeferredBehavior
+import com.atlarge.odcsim.ReceivingBehavior
+import com.atlarge.odcsim.SameBehavior
+import com.atlarge.odcsim.Signal
+import com.atlarge.odcsim.StoppedBehavior
+import com.atlarge.odcsim.UnhandledBehavior
+import com.atlarge.odcsim.isAlive
+import com.atlarge.odcsim.isUnhandled
+
+/**
+ * Helper class that interprets messages/signals, canonicalizes special objects and manages the life-cycle of
+ * [Behavior] instances.
+ *
+ * @param initialBehavior The initial behavior to use.
+ */
+class BehaviorInterpreter<T : Any>(initialBehavior: Behavior<T>) {
+ /**
+ * The current [Behavior] instance.
+ */
+ var behavior: Behavior<T> = initialBehavior
+ private set
+
+ /**
+ * A flag to indicate the interpreter is still alive.
+ */
+ val isAlive: Boolean get() = behavior.isAlive
+
+ /**
+ * Construct a [BehaviorInterpreter] with the specified initial behavior and immediately start it in the specified
+ * context.
+ *
+ * @param initialBehavior The initial behavior of the actor.
+ * @param ctx The [ActorContext] to run the behavior in.
+ */
+ constructor(initialBehavior: Behavior<T>, ctx: ActorContext<T>) : this(initialBehavior) {
+ start(ctx)
+ }
+
+ /**
+ * Start the initial behavior.
+ *
+ * @param ctx The [ActorContext] to start the behavior in.
+ */
+ fun start(ctx: ActorContext<T>) {
+ behavior = validateAsInitial(start(ctx, behavior))
+ }
+
+ /**
+ * Stop the current active behavior and move into the stopped state.
+ *
+ * @param ctx The [ActorContext] this takes place in.
+ */
+ fun stop(ctx: ActorContext<T>) {
+ behavior = start(ctx, StoppedBehavior.narrow())
+ }
+
+ /**
+ * Replace the current behavior with the specified new behavior.
+ *
+ * @param ctx The [ActorContext] to run the behavior in.
+ * @param next The behavior to replace the current behavior with.
+ */
+ fun become(ctx: ActorContext<T>, next: Behavior<T>) {
+ this.behavior = canonicalize(ctx, behavior, next)
+ }
+
+ /**
+ * Propagate special states of the wrapper [Behavior] to the specified [Behavior]. This means
+ * that if the behavior of this interpreter is stopped or unhandled, this will be propagated.
+ *
+ * @param behavior The [Behavior] to map.
+ * @return Either the specified [Behavior] or the propagated special objects.
+ */
+ fun propagate(behavior: Behavior<T>): Behavior<T> =
+ if (this.behavior.isUnhandled || !this.behavior.isAlive)
+ this.behavior
+ else
+ behavior
+
+ /**
+ * Interpret the given message of type [T] using the current active behavior.
+ *
+ * @return `true` if the message was handled by the active behavior, `false` otherwise.
+ */
+ fun interpretMessage(ctx: ActorContext<T>, msg: T): Boolean = interpret(ctx, msg, false)
+
+ /**
+ * Interpret the given [Signal] using the current active behavior.
+ *
+ * @return `true` if the signal was handled by the active behavior, `false` otherwise.
+ */
+ fun interpretSignal(ctx: ActorContext<T>, signal: Signal): Boolean = interpret(ctx, signal, true)
+
+ /**
+ * Interpret the given message or signal using the current active behavior.
+ *
+ * @return `true` if the message or signal was handled by the active behavior, `false` otherwise.
+ */
+ private fun interpret(ctx: ActorContext<T>, msg: Any, isSignal: Boolean): Boolean =
+ if (isAlive) {
+ val next = when (val current = behavior) {
+ is DeferredBehavior<T> ->
+ throw IllegalStateException("Deferred [$current] should not be passed to interpreter")
+ is ReceivingBehavior<T> ->
+ if (isSignal)
+ current.receiveSignal(ctx, msg as Signal)
+ else
+ @Suppress("UNCHECKED_CAST")
+ current.receive(ctx, msg as T)
+ is SameBehavior, is UnhandledBehavior ->
+ throw IllegalStateException("Cannot execute with [$current] as behavior")
+ is StoppedBehavior -> current
+ }
+
+ val unhandled = next.isUnhandled
+ behavior = canonicalize(ctx, behavior, next)
+ !unhandled
+ } else {
+ false
+ }
+
+ /**
+ * Validate whether the given [Behavior] can be used as initial behavior. Throw an [IllegalArgumentException] if
+ * the [Behavior] is not valid.
+ *
+ * @param behavior The behavior to validate.
+ */
+ private fun validateAsInitial(behavior: Behavior<T>): Behavior<T> =
+ when (behavior) {
+ is SameBehavior, is UnhandledBehavior ->
+ throw IllegalArgumentException("Cannot use [$behavior] as initial behavior")
+ else -> behavior
+ }
+
+ /**
+ * Helper methods to properly manage the special, canned behavior objects. It highly recommended to use the
+ * [BehaviorInterpreter] instead to properly manage the life-cycles of the behavior objects.
+ */
+ companion object {
+ /**
+ * Start the initial behavior of an actor in the specified [ActorContext].
+ *
+ * This will activate the initial behavior and canonicalize the resulting behavior.
+ *
+ * @param ctx The [ActorContext] to start the behavior in.
+ * @param behavior The initial behavior to start.
+ * @return The behavior that has been started.
+ */
+ tailrec fun <T : Any> start(ctx: ActorContext<T>, behavior: Behavior<T>): Behavior<T> =
+ when (behavior) {
+ is DeferredBehavior<T> -> start(ctx, behavior(ctx))
+ else -> behavior
+ }
+
+ /**
+ * Given a possibly special behavior (same or unhandled) and a "current" behavior (which defines the meaning of
+ * encountering a `same` behavior) this method computes the next behavior, suitable for passing a message or
+ * signal.
+ *
+ * @param ctx The context in which the actor runs.
+ * @param current The actor's current behavior.
+ * @param next The actor's next behavior.
+ * @return The actor's canonicalized next behavior.
+ */
+ tailrec fun <T : Any> canonicalize(ctx: ActorContext<T>, current: Behavior<T>, next: Behavior<T>): Behavior<T> =
+ when (next) {
+ is SameBehavior, current -> current
+ is UnhandledBehavior -> current
+ is DeferredBehavior<T> -> canonicalize(ctx, current, next(ctx))
+ else -> next
+ }
+ }
+}
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt
new file mode 100644
index 00000000..d8c3e8fc
--- /dev/null
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt
@@ -0,0 +1,169 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2018 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.odcsim.internal
+
+import com.atlarge.odcsim.ActorContext
+import com.atlarge.odcsim.ActorRef
+import com.atlarge.odcsim.Behavior
+import com.atlarge.odcsim.Instant
+import com.atlarge.odcsim.ReceivingBehavior
+import com.atlarge.odcsim.Signal
+import com.atlarge.odcsim.coroutines.SuspendingActorContext
+import com.atlarge.odcsim.coroutines.SuspendingBehavior
+import com.atlarge.odcsim.dsl.receiveMessage
+import com.atlarge.odcsim.dsl.receiveSignal
+import com.atlarge.odcsim.coroutines.suspendWithBehavior
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.resume
+import kotlin.coroutines.startCoroutine
+
+/**
+ * The interface that is exposed from the [CoroutineContext] and provides control over the [SuspendingBehavior]
+ * instance.
+ */
+interface SuspendingBehaviorContext<T : Any> : CoroutineContext.Element {
+ /**
+ * The [ActorContext] in which the actor currently runs.
+ */
+ val context: SuspendingActorContext<T>
+
+ /**
+ * The current active behavior
+ */
+ val behavior: Behavior<T>
+
+ /**
+ * Replace the current active behavior with the specified new behavior.
+ *
+ * @param next The behavior to replace the current behavior with.
+ */
+ fun become(next: Behavior<T>)
+
+ /**
+ * This key provides users access to an untyped actor context in case the coroutine runs inside a
+ * [SuspendingBehavior].
+ */
+ companion object Key : CoroutineContext.Key<SuspendingBehaviorContext<*>>
+}
+
+/**
+ * Implementation of [SuspendingBehavior] class that maps the suspending method calls to the [Behavior]
+ * interface.
+ * This implementation uses the fact that each actor is thread-safe (as it processes its mailbox sequentially).
+ */
+internal class SuspendingBehaviorImpl<T : Any>(actorContext: ActorContext<T>, initialBehavior: SuspendingBehavior<T>) :
+ ReceivingBehavior<T>(), SuspendingActorContext<T>, SuspendingBehaviorContext<T> {
+ /**
+ * The next behavior to use.
+ */
+ private var next: Behavior<T> = this
+
+ /**
+ * The [BehaviorInterpreter] to wrap the suspending behavior.
+ */
+ private val interpreter = BehaviorInterpreter(initialBehavior)
+
+ /**
+ * The current active [ActorContext].
+ */
+ private var actorContext: ActorContext<T>
+
+ init {
+ this.actorContext = actorContext
+ this.start()
+ }
+
+ override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> {
+ this.actorContext = ctx
+ return interpreter.also { it.interpretMessage(ctx, msg) }.propagate(next)
+ }
+
+ override fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> {
+ this.actorContext = ctx
+ return interpreter.also { it.interpretSignal(ctx, signal) }.propagate(next)
+ }
+
+ override val self: ActorRef<T> get() = actorContext.self
+
+ override val time: Instant get() = actorContext.time
+
+ override fun <U : Any> spawn(behavior: Behavior<U>, name: String) = actorContext.spawn(behavior, name)
+
+ override fun stop(child: ActorRef<*>): Boolean = actorContext.stop(child)
+
+ override suspend fun receive(): T = suspendWithBehavior<T, T> { cont, next ->
+ receiveMessage { msg ->
+ cont.resume(msg)
+ next()
+ }
+ }
+
+ override suspend fun receiveSignal(): Signal = suspendWithBehavior<T, Signal> { cont, next ->
+ receiveSignal { _, signal ->
+ cont.resume(signal)
+ next()
+ }
+ }
+
+ override val context: SuspendingActorContext<T> get() = this
+
+ override val behavior: Behavior<T> get() = interpreter.behavior
+
+ override fun become(next: Behavior<T>) {
+ interpreter.become(actorContext, next)
+ }
+
+ override val key: CoroutineContext.Key<*> = SuspendingBehaviorContext.Key
+
+ /**
+ * Start the suspending behavior.
+ */
+ private fun start() {
+ val behavior = interpreter.behavior as SuspendingBehavior<T>
+ val block: suspend () -> Unit = { behavior(this) }
+ block.startCoroutine(SuspendingBehaviorImplContinuation())
+ }
+
+ /**
+ * Stop the suspending behavior.
+ */
+ private fun stop() {
+ this.interpreter.stop(actorContext)
+ }
+
+ /**
+ * The continuation of suspending behavior.
+ */
+ private inner class SuspendingBehaviorImplContinuation : Continuation<Unit> {
+ override val context = this@SuspendingBehaviorImpl
+
+ override fun resumeWith(result: Result<Unit>) {
+ // TODO Add some form of error handling here
+ stop()
+ next = Behavior.stopped()
+ }
+ }
+}
diff --git a/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt b/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt
index 73b060b9..492dc686 100644
--- a/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt
+++ b/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt
@@ -250,7 +250,6 @@ abstract class ActorSystemTest {
Behavior.ignore()
}, "child")
-
Behavior.receive { ctx2, msg ->
assertTrue(ctx2.stop(child))
msg.send(Unit) // This actor should be stopped now and not receive the message anymore
@@ -332,5 +331,23 @@ abstract class ActorSystemTest {
val system = factory(Behavior.stopped<Unit>(), "test")
system.run()
}
+
+ /**
+ * Test whether deferred behavior that returns [Behavior.Companion.same] fails.
+ */
+ @Test
+ fun `should not allow setup to return same`() {
+ val system = factory(Behavior.setup<Unit> { Behavior.same() }, "test")
+ assertThrows<IllegalArgumentException> { system.run() }
+ }
+
+ /**
+ * Test whether deferred behavior that returns [Behavior.Companion.unhandled] fails.
+ */
+ @Test
+ fun `should not allow setup to return unhandled`() {
+ val system = factory(Behavior.setup<Unit> { Behavior.unhandled() }, "test")
+ assertThrows<IllegalArgumentException> { system.run() }
+ }
}
}
diff --git a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
index 3da82b3d..56e3020f 100644
--- a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
+++ b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
@@ -29,12 +29,12 @@ import com.atlarge.odcsim.ActorPath
import com.atlarge.odcsim.ActorRef
import com.atlarge.odcsim.ActorSystem
import com.atlarge.odcsim.Behavior
-import com.atlarge.odcsim.BehaviorInterpreter
import com.atlarge.odcsim.Duration
import com.atlarge.odcsim.Instant
import com.atlarge.odcsim.PostStop
import com.atlarge.odcsim.PreStart
import com.atlarge.odcsim.Signal
+import com.atlarge.odcsim.internal.BehaviorInterpreter
import java.util.PriorityQueue
import kotlin.math.max