diff options
7 files changed, 602 insertions, 123 deletions
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt index 4dea9897..b10287ad 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt @@ -29,7 +29,7 @@ package com.atlarge.odcsim * * @param T The shape of the messages the actor accepts. */ -interface ActorContext<in T : Any> { +interface ActorContext<T : Any> { /** * The identity of the actor, bound to the lifecycle of this actor instance. */ diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt index 8d1e069c..f6a9f0b5 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt @@ -41,3 +41,9 @@ interface ActorRef<in T : Any> { */ fun send(msg: T, after: Duration = 0.1) } + +/** + * Unsafe helper method for widening the type accepted by this [ActorRef]. + */ +@Suppress("UNCHECKED_CAST") +fun <U : Any, T : U> ActorRef<T>.upcast(): ActorRef<U> = this as ActorRef<U> diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt index 54b02341..87b2f28a 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt @@ -24,21 +24,314 @@ package com.atlarge.odcsim +import com.atlarge.odcsim.dsl.wrap + +/** + * The representation of the behavior of an actor. + * + * Behavior can be formulated using factory methods on the companion object or by extending either [DeferredBehavior] or + * [ReceivingBehavior]. + * + * Users are advised not to close over [ActorContext] within [Behavior], as it will causes it to become immobile, + * meaning it cannot be moved to another context and executed there, and therefore it cannot be replicated or forked + * either. + * + * @param T The shape of the messages the behavior accepts. + */ +sealed class Behavior<T : Any> { + /** + * Narrow the type of this behavior. + * + * This is almost always a safe operation, but might cause [ClassCastException] in case a narrowed behavior sends + * messages of a different type to itself and is chained via [Behavior.orElse]. + */ + fun <U : T> narrow(): Behavior<U> { + @Suppress("UNCHECKED_CAST") + return this as Behavior<U> + } + + /** + * Companion object used to bind factory methods to. + */ + companion object { + /** + * This [Behavior] is used to signal that this actor shall terminate voluntarily. If this actor has created child actors + * then these will be stopped as part of the shutdown procedure. + */ + fun <T : Any> stopped(): Behavior<T> { + @Suppress("UNCHECKED_CAST") + return StoppedBehavior as Behavior<T> + } + + /** + * This [Behavior] is used to signal that this actor wants to reuse its previous behavior. + */ + fun <T : Any> same(): Behavior<T> { + @Suppress("UNCHECKED_CAST") + return SameBehavior as Behavior<T> + } + + /** + * This [Behavior] is used to signal to the system that the last message or signal went unhandled. This will + * reuse the previous behavior. + */ + fun <T : Any> unhandled(): Behavior<T> { + @Suppress("UNCHECKED_CAST") + return UnhandledBehavior as Behavior<T> + } + } +} + /** - * The behavior of an actor defines how it reacts to the messages that it receives. + * A [Behavior] that defers the construction of the actual [Behavior] until the actor is started in some [ActorContext]. + * If the actor is already started, it will immediately evaluate. + * + * @param T The shape of the messages the behavior accepts. + */ +abstract class DeferredBehavior<T : Any> : Behavior<T>() { + /** + * Create a [Behavior] instance in the [specified][ctx] [ActorContext]. + * + * @param ctx The [ActorContext] in which the behavior runs. + * @return The actor's next behavior. + */ + abstract operator fun invoke(ctx: ActorContext<T>): Behavior<T> +} + +/** + * A [Behavior] that concretely defines how an actor will react to the messages and signals it receives. * The message may either be of the type that the actor declares and which is part of the [ActorRef] signature, * or it may be a system [Signal] that expresses a lifecycle event of either this actor or one of its child actors. * - * @param T The shape of the messages the behavior accepts. + * @param T The shape of the messages the behavior accepts.* */ -interface Behavior<T : Any> { +abstract class ReceivingBehavior<T : Any> : Behavior<T>() { /** - * Process an incoming message and return the actor's next [Behavior]. + * Process an incoming message of type [T] and return the actor's next behavior. + * + * The returned behavior can in addition to normal behaviors be one of the canned special objects: + * - returning [Behavior.Companion.stopped] will terminate this Behavior + * - returning [Behavior.Companion.same] designates to reuse the current Behavior + * - returning [Behavior.Companion.unhandled] keeps the same Behavior and signals that the message was not yet handled + * + * @param ctx The [ActorContext] in which the actor is currently running. + * @param msg The message that was received. */ - fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> = this + open fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> = Behavior.unhandled() /** - * Process an incoming [Signal] and return the actor's next [Behavior]. + * Process an incoming [Signal] and return the actor's next behavior. + * + * The returned behavior can in addition to normal behaviors be one of the canned special objects: + * - returning [Behavior.Companion.stopped] will terminate this Behavior + * - returning [Behavior.Companion.same] designates to reuse the current Behavior + * - returning [Behavior.Companion.unhandled] keeps the same Behavior and signals that the message was not yet handled + * + * @param ctx The [ActorContext] in which the actor is currently running. + * @param signal The [Signal] that was received. */ - fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> = this + open fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> = Behavior.unhandled() +} + +/** + * Compose this [Behavior] with a fallback [Behavior] which is used in case this [Behavior] does not handle the incoming + * message or signal. + */ +fun <T : Any> Behavior<T>.orElse(behavior: Behavior<T>): Behavior<T> = + Behavior.wrap(this) { left -> + Behavior.wrap(behavior) { right -> + object : ReceivingBehavior<T>() { + override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> { + if (left.interpretMessage(ctx, msg)) { + return left.behavior + } else if (right.interpretMessage(ctx, msg)) { + return right.behavior + } + + return Behavior.unhandled() + } + + override fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> { + if (left.interpretSignal(ctx, signal)) { + return left.behavior + } else if (right.interpretSignal(ctx, signal)) { + return right.behavior + } + + return Behavior.unhandled() + } + } + } + } + +/** + * A flag to indicate whether a [Behavior] instance is still alive. + */ +val <T : Any> Behavior<T>.isAlive get() = this !is StoppedBehavior + +/** + * A flag to indicate whether the last message/signal went unhandled. + */ +val <T : Any> Behavior<T>.isUnhandled get() = this !is UnhandledBehavior + +/** + * A special [Behavior] instance that signals that the actor has stopped. + */ +private object StoppedBehavior : Behavior<Any>() { + override fun toString() = "Stopped" +} + +/** + * A special [Behavior] object to signal that the actor wants to reuse its previous behavior. + */ +private object SameBehavior : Behavior<Nothing>() { + override fun toString() = "Same" +} + +/** + * A special [Behavior] object that indicates that the last message or signal was not handled. + */ +private object UnhandledBehavior : Behavior<Nothing>() { + override fun toString() = "Unhandled" +} + +/** + * Helper class that interprets messages/signals, canonicalizes special objects and manages the life-cycle of + * [Behavior] instances. + * + * @param initialBehavior The initial behavior to use. + */ +class BehaviorInterpreter<T : Any>(initialBehavior: Behavior<T>) { + /** + * The current [Behavior] instance. + */ + var behavior: Behavior<T> = initialBehavior + + /** + * A flag to indicate whether the current behavior is still alive. + */ + val isAlive: Boolean get() = behavior.isAlive + + /** + * Construct a [BehaviorInterpreter] with the specified initial behavior and immediately start it in the specified + * context. + * + * @param initialBehavior The initial behavior of the actor. + * @param ctx The [ActorContext] to run the behavior in. + */ + constructor(initialBehavior: Behavior<T>, ctx: ActorContext<T>) : this(initialBehavior) { + start(ctx) + } + + /** + * Start the initial behavior. + * + * @param ctx The [ActorContext] to start the behavior in. + */ + fun start(ctx: ActorContext<T>) { + behavior = validateAsInitial(start(behavior, ctx)) + } + + /** + * Stop the current active behavior and move into the stopped state. + * + * @param ctx The [ActorContext] this takes place in. + */ + fun stop(ctx: ActorContext<T>) { + behavior = start(StoppedBehavior.narrow(), ctx) + } + + /** + * Interpret the given message of type [T] using the current active behavior. + * + * @return `true` if the message was handled by the active behavior, `false` otherwise. + */ + fun interpretMessage(ctx: ActorContext<T>, msg: T): Boolean = interpret(ctx, msg, false) + + /** + * Interpret the given [Signal] using the current active behavior. + * + * @return `true` if the signal was handled by the active behavior, `false` otherwise. + */ + fun interpretSignal(ctx: ActorContext<T>, signal: Signal): Boolean = interpret(ctx, signal, true) + + /** + * Interpret the given message or signal using the current active behavior. + * + * @return `true` if the message or signal was handled by the active behavior, `false` otherwise. + */ + private fun interpret(ctx: ActorContext<T>, msg: Any, isSignal: Boolean): Boolean = + if (isAlive) { + val next = when (val current = behavior) { + is DeferredBehavior<T> -> + throw IllegalStateException("Deferred [$current] should not be passed to interpreter") + is ReceivingBehavior<T> -> + if (isSignal) + current.receiveSignal(ctx, msg as Signal) + else + @Suppress("UNCHECKED_CAST") + current.receive(ctx, msg as T) + is SameBehavior, is UnhandledBehavior -> + throw IllegalStateException("Cannot execute with [$current] as behavior") + is StoppedBehavior -> current + } + + val unhandled = next.isUnhandled + behavior = canonicalize(next, behavior, ctx) + !unhandled + } else { + false + } + + /** + * Validate whether the given [Behavior] can be used as initial behavior. Throw an [IllegalArgumentException] if + * the [Behavior] is not valid. + * + * @param behavior The behavior to validate. + */ + private fun validateAsInitial(behavior: Behavior<T>): Behavior<T> = + when (behavior) { + is SameBehavior, is UnhandledBehavior -> + throw IllegalArgumentException("Cannot use [$behavior] as initial behavior") + else -> behavior + } + + /** + * Helper methods to properly manage the special, canned behavior objects. It highly recommended to use the + * [BehaviorInterpreter] instead to properly manage the life-cycles of the behavior objects. + */ + companion object { + /** + * Start the initial behavior of an actor in the specified [ActorContext]. + * + * This will activate the initial behavior and canonicalize the resulting behavior. + * + * @param behavior The initial behavior to start. + * @param ctx The [ActorContext] to start the behavior in. + * @return The behavior that has been started. + */ + tailrec fun <T : Any> start(behavior: Behavior<T>, ctx: ActorContext<T>): Behavior<T> = + when (behavior) { + is DeferredBehavior<T> -> start(behavior(ctx), ctx) + else -> behavior + } + + /** + * Given a possibly special behavior (same or unhandled) and a "current" behavior (which defines the meaning of + * encountering a `same` behavior) this method computes the next behavior, suitable for passing a message or + * signal. + * + * @param behavior The actor's next behavior. + * @param current The actor's current behavior. + * @param ctx The context in which the actor runs. + * @return The actor's canonicalized next behavior. + */ + tailrec fun <T : Any> canonicalize(behavior: Behavior<T>, current: Behavior<T>, ctx: ActorContext<T>): Behavior<T> = + when (behavior) { + is SameBehavior, current -> current + is UnhandledBehavior -> current + is DeferredBehavior<T> -> canonicalize(behavior(ctx), current, ctx) + else -> behavior + } + } } diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt new file mode 100644 index 00000000..590595d3 --- /dev/null +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt @@ -0,0 +1,106 @@ +/* + * MIT License + * + * Copyright (c) 2018 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim.dsl + +import com.atlarge.odcsim.ActorContext +import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.BehaviorInterpreter +import com.atlarge.odcsim.DeferredBehavior +import com.atlarge.odcsim.ReceivingBehavior +import com.atlarge.odcsim.Signal + +/** + * A factory for [Behavior]. Creation of the behavior instance is deferred until the actor is started. + */ +fun <T : Any> Behavior.Companion.setup(block: (ActorContext<T>) -> Behavior<T>): Behavior<T> { + return object : DeferredBehavior<T>() { + override fun invoke(ctx: ActorContext<T>): Behavior<T> = block(ctx) + } +} + +/** + * A [Behavior] that ignores any incoming message or signal and keeps the same behavior. + */ +fun <T : Any> Behavior.Companion.ignore(): Behavior<T> = IgnoreBehavior.narrow() + +/** + * A [Behavior] object that ignores all messages sent to the actor. + */ +private object IgnoreBehavior : ReceivingBehavior<Any>() { + override fun receive(ctx: ActorContext<Any>, msg: Any): Behavior<Any> = this + + override fun receiveSignal(ctx: ActorContext<Any>, signal: Signal): Behavior<Any> = this + + override fun toString() = "Ignore" +} + +/** + * A [Behavior] that treats every incoming message or signal as unhandled. + */ +fun <T : Any> Behavior.Companion.empty(): Behavior<T> = EmptyBehavior.narrow() + +/** + * A [Behavior] object that does not handle any message it receives. + */ +private object EmptyBehavior : ReceivingBehavior<Any>() { + override fun toString() = "Empty" +} + +/** + * Construct a [Behavior] that reacts to incoming messages, provides access to the [ActorContext] and returns the + * actor's next behavior. + */ +fun <T : Any> Behavior.Companion.receive(handler: (ActorContext<T>, T) -> Behavior<T>): Behavior<T> { + return object : ReceivingBehavior<T>() { + override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> = handler(ctx, msg) + } +} + +/** + * Construct a [Behavior] that reacts to incoming messages and returns the actor's next behavior. + */ +fun <T : Any> Behavior.Companion.receiveMessage(onMessage: (T) -> Behavior<T>): Behavior<T> { + return object : ReceivingBehavior<T>() { + override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> = onMessage(msg) + } +} + +/** + * Construct a [Behavior] that reacts to incoming signals, provides access to the [ActorContext] and returns the + * actor's next behavior. + */ +fun <T : Any> Behavior.Companion.receiveSignal(handler: (ActorContext<T>, Signal) -> Behavior<T>): Behavior<T> { + return object : ReceivingBehavior<T>() { + override fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> = handler(ctx, signal) + } +} + +/** + * Construct a [Behavior] that wraps another behavior instance and uses a [BehaviorInterpreter] to pass incoming + * messages and signals to the wrapper behavior. + */ +fun <T : Any> Behavior.Companion.wrap(behavior: Behavior<T>, wrap: (BehaviorInterpreter<T>) -> Behavior<T>): Behavior<T> { + return Behavior.setup { ctx -> wrap(BehaviorInterpreter(behavior, ctx)) } +} diff --git a/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemFactoryTest.kt b/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemFactoryTest.kt index 266f9fba..3830f09e 100644 --- a/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemFactoryTest.kt +++ b/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemFactoryTest.kt @@ -24,6 +24,8 @@ package com.atlarge.odcsim +import com.atlarge.odcsim.dsl.empty +import com.atlarge.odcsim.dsl.setup import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows @@ -44,7 +46,7 @@ abstract class ActorSystemFactoryTest { fun `should create a system with correct name`() { val factory = createFactory() val name = "test" - val system = factory(object : Behavior<Unit> {}, name) + val system = factory(Behavior.empty<Unit>(), name) assertEquals(name, system.name) } @@ -55,11 +57,7 @@ abstract class ActorSystemFactoryTest { @Test fun `should create a system with correct root behavior`() { val factory = createFactory() - val system = factory(object : Behavior<Unit> { - override fun receiveSignal(ctx: ActorContext<Unit>, signal: Signal): Behavior<Unit> { - throw UnsupportedOperationException() - } - }, "test") + val system = factory(Behavior.setup<Unit> { throw UnsupportedOperationException() }, "test") assertThrows<UnsupportedOperationException> { system.run(until = 10.0) } } diff --git a/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt b/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt index 71bc645a..879fe35a 100644 --- a/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt +++ b/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt @@ -24,6 +24,11 @@ package com.atlarge.odcsim +import com.atlarge.odcsim.dsl.empty +import com.atlarge.odcsim.dsl.ignore +import com.atlarge.odcsim.dsl.receive +import com.atlarge.odcsim.dsl.receiveMessage +import com.atlarge.odcsim.dsl.setup import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertFalse import org.junit.jupiter.api.Assertions.assertTrue @@ -49,7 +54,7 @@ abstract class ActorSystemTest { @Test fun `should have a name`() { val name = "test" - val system = factory(object : Behavior<Unit> {}, name) + val system = factory(Behavior.empty<Unit>(), name) assertEquals(name, system.name) } @@ -59,7 +64,7 @@ abstract class ActorSystemTest { */ @Test fun `should have a path`() { - val system = factory(object : Behavior<Unit> {}, "test") + val system = factory(Behavior.empty<Unit>(), "test") assertTrue(system.path is ActorPath.Root) } @@ -69,7 +74,7 @@ abstract class ActorSystemTest { */ @Test fun `should start at t=0`() { - val system = factory(object : Behavior<Unit> {}, name = "test") + val system = factory(Behavior.empty<Unit>(), name = "test") assertTrue(Math.abs(system.time) < DELTA) } @@ -79,7 +84,7 @@ abstract class ActorSystemTest { */ @Test fun `should not accept negative instants for running`() { - val system = factory(object : Behavior<Unit> {}, name = "test") + val system = factory(Behavior.empty<Unit>(), name = "test") assertThrows<IllegalArgumentException> { system.run(-10.0) } } @@ -90,7 +95,7 @@ abstract class ActorSystemTest { @Test fun `should not jump backward in time`() { val until = 10.0 - val system = factory(object : Behavior<Unit> {}, name = "test") + val system = factory(Behavior.empty<Unit>(), name = "test") system.run(until = until) system.run(until = until - 0.5) @@ -103,7 +108,7 @@ abstract class ActorSystemTest { @Test fun `should jump forward in time`() { val until = 10.0 - val system = factory(object : Behavior<Unit> {}, name = "test") + val system = factory(Behavior.empty<Unit>(), name = "test") system.run(until = until) assertTrue(Math.abs(system.time - until) < DELTA) @@ -114,15 +119,12 @@ abstract class ActorSystemTest { */ @Test fun `should order messages at the instant by insertion time`() { - val behavior = object : Behavior<Int> { - override fun receive(ctx: ActorContext<Int>, msg: Int): Behavior<Int> { - assertEquals(1, msg) - return object : Behavior<Int> { - override fun receive(ctx: ActorContext<Int>, msg: Int): Behavior<Int> { - assertEquals(2, msg) - return this - } - } + val behavior = Behavior.receiveMessage<Int> { msg -> + assertEquals(1, msg) + + Behavior.receiveMessage { + assertEquals(2, it) + Behavior.ignore() } } val system = factory(behavior, name = "test") @@ -131,6 +133,14 @@ abstract class ActorSystemTest { system.run(until = 10.0) } + /** + * Test whether an [ActorSystem] will not initialize the root actor if the system has not been run yet. + */ + @Test + fun `should not initialize root actor if not run`() { + factory(Behavior.setup<Unit> { TODO() }, name = "test") + } + @Nested @DisplayName("ActorRef") inner class ActorRefTest { @@ -139,7 +149,7 @@ abstract class ActorSystemTest { */ @Test fun `should disallow messages in the past`() { - val system = factory(object : Behavior<Unit> {}, name = "test") + val system = factory(Behavior.empty<Unit>(), name = "test") assertThrows<IllegalArgumentException> { system.send(Unit, after = -1.0) } } } @@ -153,11 +163,9 @@ abstract class ActorSystemTest { */ @Test fun `should pre-start at t=0 if root`() { - val behavior = object : Behavior<Unit> { - override fun receiveSignal(ctx: ActorContext<Unit>, signal: Signal): Behavior<Unit> { - assertTrue(Math.abs(ctx.time) < DELTA) - return this - } + val behavior = Behavior.setup<Unit> { ctx -> + assertTrue(Math.abs(ctx.time) < DELTA) + Behavior.ignore() } val system = factory(behavior, "test") @@ -169,21 +177,12 @@ abstract class ActorSystemTest { */ @Test fun `should allow spawning of child actors`() { - val behavior = object : Behavior<Unit> { - override fun receiveSignal(ctx: ActorContext<Unit>, signal: Signal): Behavior<Unit> { - throw UnsupportedOperationException("b") - } - } + val behavior = Behavior.setup<Unit> { throw UnsupportedOperationException("b") } - val system = factory(object : Behavior<Unit> { - override fun receiveSignal(ctx: ActorContext<Unit>, signal: Signal): Behavior<Unit> { - if (signal is PreStart) { - val ref = ctx.spawn(behavior, "child") - assertEquals("child", ref.path.name) - } - - return this - } + val system = factory(Behavior.setup<Unit> { ctx -> + val ref = ctx.spawn(behavior, "child") + assertEquals("child", ref.path.name) + Behavior.ignore() }, name = "test") assertThrows<UnsupportedOperationException> { system.run(until = 10.0) } @@ -194,20 +193,12 @@ abstract class ActorSystemTest { */ @Test fun `should allow stopping of child actors`() { - val system = factory(object : Behavior<Unit> { - override fun receiveSignal(ctx: ActorContext<Unit>, signal: Signal): Behavior<Unit> { - if (signal is PreStart) { - val ref = ctx.spawn(object : Behavior<Unit> { - override fun receive(ctx: ActorContext<Unit>, msg: Unit): Behavior<Unit> { - throw UnsupportedOperationException() - } - }, "child") - assertTrue(ctx.stop(ref)) - assertEquals("child", ref.path.name) - } - - return this - } + val system = factory(Behavior.setup<Unit> { ctx -> + val ref = ctx.spawn(Behavior.receiveMessage<Unit> { throw UnsupportedOperationException() }, "child") + assertTrue(ctx.stop(ref)) + assertEquals("child", ref.path.name) + + Behavior.ignore() }, name = "test") system.run(until = 10.0) @@ -218,17 +209,15 @@ abstract class ActorSystemTest { */ @Test fun `should only be able to terminate child actors`() { - val system = factory(object : Behavior<Unit> { - override fun receiveSignal(ctx: ActorContext<Unit>, signal: Signal): Behavior<Unit> { - val child1 = ctx.spawn(object : Behavior<Unit> {}, "child-1") - ctx.spawn(object : Behavior<Unit> { - override fun receiveSignal(ctx: ActorContext<Unit>, signal: Signal): Behavior<Unit> { - assertFalse(ctx.stop(child1)) - return this - } - }, "child-2") - return this - } + val system = factory(Behavior.setup<Unit> { ctx -> + val child1 = ctx.spawn(Behavior.ignore<Unit>(), "child-1") + + ctx.spawn(Behavior.setup<Unit> { + assertFalse(it.stop(child1)) + Behavior.ignore() + }, "child-2") + + Behavior.ignore() }, name = "test") system.run() } @@ -238,13 +227,11 @@ abstract class ActorSystemTest { */ @Test fun `should not be able to stop an already terminated child`() { - val system = factory(object : Behavior<Unit> { - override fun receiveSignal(ctx: ActorContext<Unit>, signal: Signal): Behavior<Unit> { - val child = ctx.spawn(object : Behavior<Unit> {}, "child") - ctx.stop(child) - assertFalse(ctx.stop(child)) - return this - } + val system = factory(Behavior.setup<Unit> { ctx -> + val child = ctx.spawn(Behavior.ignore<Unit>(), "child") + ctx.stop(child) + assertFalse(ctx.stop(child)) + Behavior.ignore() }, name = "test") system.run() } @@ -254,53 +241,98 @@ abstract class ActorSystemTest { */ @Test fun `should terminate children of child when terminating it`() { - val system = factory(object : Behavior<ActorRef<Unit>> { - lateinit var child: ActorRef<Unit> + val system = factory(Behavior.setup<ActorRef<Unit>> { ctx1 -> + val root = ctx1.self + val child = ctx1.spawn(Behavior.setup<Unit> { + val child = ctx1.spawn(Behavior.receiveMessage<Unit> { + throw IllegalStateException("DELIBERATE") + }, "child") + root.send(child) + Behavior.ignore() + }, "child") - override fun receive(ctx: ActorContext<ActorRef<Unit>>, msg: ActorRef<Unit>): Behavior<ActorRef<Unit>> { - assertTrue(ctx.stop(child)) + + Behavior.receive { ctx2, msg -> + assertTrue(ctx2.stop(child)) msg.send(Unit) // This actor should be stopped now and not receive the message anymore - return this + Behavior.stopped() } - override fun receiveSignal(ctx: ActorContext<ActorRef<Unit>>, signal: Signal): Behavior<ActorRef<Unit>> { - val root = ctx.self - child = ctx.spawn(object : Behavior<Unit> { - override fun receiveSignal(ctx: ActorContext<Unit>, signal: Signal): Behavior<Unit> { - if (signal is PreStart) { - val child = ctx.spawn(object : Behavior<Unit> { - override fun receive(ctx: ActorContext<Unit>, msg: Unit): Behavior<Unit> { - throw IllegalStateException() - } - }, "child") - root.send(child) - } - return this - } - }, "child") - return this - } }, name = "test") + system.run() } /** - * Test whether the reference to the actor itself is valid. + * Test whether [Behavior.Companion.same] works correctly. */ @Test - fun `should have reference to itself`() { - val behavior = object : Behavior<Unit> { - override fun receive(ctx: ActorContext<Unit>, msg: Unit): Behavior<Unit> { - throw UnsupportedOperationException() - } - override fun receiveSignal(ctx: ActorContext<Unit>, signal: Signal): Behavior<Unit> { + fun `should keep same behavior on same`() { + var counter = 0 + + val behavior = Behavior.setup<Unit> { ctx -> + counter++ ctx.self.send(Unit) - return this + + Behavior.receiveMessage { + counter++ + Behavior.same() } } val system = factory(behavior, "test") + system.run() + assertEquals(2, counter) + } + + /** + * Test whether the reference to the actor itself is valid. + */ + @Test + fun `should have reference to itself`() { + val behavior: Behavior<Unit> = Behavior.setup { ctx -> + ctx.self.send(Unit) + Behavior.receiveMessage { throw UnsupportedOperationException() } + } + + val system = factory(behavior, "test") assertThrows<UnsupportedOperationException> { system.run() } } + + /** + * Test whether we cannot start an actor with the [Behavior.Companion.same] behavior. + */ + @Test + fun `should not start with same behavior`() { + val system = factory(Behavior.same<Unit>(), "test") + assertThrows<IllegalArgumentException> { system.run() } + } + + /** + * Test whether we cannot start an actor with the [Behavior.Companion.unhandled] behavior. + */ + @Test + fun `should not start with unhandled behavior`() { + val system = factory(Behavior.unhandled<Unit>(), "test") + assertThrows<IllegalArgumentException> { system.run() } + } + + /** + * Test whether we cannot start an actor with deferred unhandled behavior. + */ + @Test + fun `should not start with deferred unhandled behavior`() { + val system = factory(Behavior.setup<Unit> { Behavior.unhandled() }, "test") + assertThrows<IllegalArgumentException> { system.run() } + } + + /** + * Test whether we can start an actor with the [Behavior.Companion.stopped] behavior. + */ + @Test + fun `should start with stopped behavior`() { + val system = factory(Behavior.stopped<Unit>(), "test") + system.run() + } } } diff --git a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt index 3eaddf51..3da82b3d 100644 --- a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt +++ b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt @@ -29,6 +29,7 @@ import com.atlarge.odcsim.ActorPath import com.atlarge.odcsim.ActorRef import com.atlarge.odcsim.ActorSystem import com.atlarge.odcsim.Behavior +import com.atlarge.odcsim.BehaviorInterpreter import com.atlarge.odcsim.Duration import com.atlarge.odcsim.Instant import com.atlarge.odcsim.PostStop @@ -58,6 +59,11 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) override val path: ActorPath = ActorPath.Root(name = "/user") /** + * A flag to indicate the system has started. + */ + private var isStarted: Boolean = false + + /** * The event queue to process */ private val queue: PriorityQueue<Envelope> = PriorityQueue(Comparator @@ -69,10 +75,21 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) */ private val registry: MutableMap<ActorPath, Actor<*>> = HashMap() + init { + registry[path] = Actor(this, root) + schedule(this, PreStart, .0) + } + override fun run(until: Duration) { require(until >= .0) { "The given instant must be a non-negative number" } - while (true) { + // Start the root actor on initial run + if (!isStarted) { + registry[path]!!.start() + isStarted = true + } + + while (time < until) { val envelope = queue.peek() ?: break val delivery = envelope.time.takeUnless { it > until } ?: break @@ -99,13 +116,15 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) */ private var nextId: Long = 0 - init { - registry[path] = Actor(this, root) - schedule(this, PreStart, .0) - } - - private inner class Actor<T : Any>(override val self: ActorRef<T>, var behavior: Behavior<T>) : ActorContext<T> { + /** + * An actor as represented in the Omega engine. + * + * @param self The [ActorRef] to this actor. + * @param initialBehavior The initial behavior of this actor. + */ + private inner class Actor<T : Any>(override val self: ActorRef<T>, initialBehavior: Behavior<T>) : ActorContext<T> { val children: MutableSet<Actor<*>> = mutableSetOf() + val interpreter = BehaviorInterpreter(initialBehavior) override val time: Instant get() = this@OmegaActorSystem.time @@ -117,6 +136,7 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) registry[ref.path] = actor children += actor schedule(ref, PreStart, .0) + actor.start() } return ref } @@ -127,25 +147,49 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) return false } val ref = registry[child.path] ?: return false - ref.terminate() + ref.stop() return true } /** + * Start this actor. + */ + fun start() { + interpreter.start(this) + } + + /** + * Stop this actor. + */ + fun stop() { + interpreter.stop(this) + terminate() + interpreter.interpretSignal(this, PostStop) + } + + /** * Terminate this actor and its children. */ fun terminate() { children.forEach { it.terminate() } registry.remove(self.path) - interpretMessage(PostStop) } /** - * Interpret the given message send to an actor. Make sure the message is of the correct type. + * Interpret the given message send to an actor. */ fun interpretMessage(msg: Any) { - @Suppress("UNCHECKED_CAST") - behavior = if (msg is Signal) behavior.receiveSignal(this, msg) else behavior.receive(this, msg as T) + if (msg is Signal) { + interpreter.interpretSignal(this, msg) + } else { + @Suppress("UNCHECKED_CAST") + interpreter.interpretMessage(this, msg as T) + } + + if (!interpreter.isAlive) { + terminate() + interpreter.interpretSignal(this, PostStop) + } } override fun equals(other: Any?): Boolean = |
