summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt2
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt6
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt307
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt106
-rw-r--r--odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemFactoryTest.kt10
-rw-r--r--odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt226
-rw-r--r--odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt68
7 files changed, 602 insertions, 123 deletions
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt
index 4dea9897..b10287ad 100644
--- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt
@@ -29,7 +29,7 @@ package com.atlarge.odcsim
*
* @param T The shape of the messages the actor accepts.
*/
-interface ActorContext<in T : Any> {
+interface ActorContext<T : Any> {
/**
* The identity of the actor, bound to the lifecycle of this actor instance.
*/
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt
index 8d1e069c..f6a9f0b5 100644
--- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorRef.kt
@@ -41,3 +41,9 @@ interface ActorRef<in T : Any> {
*/
fun send(msg: T, after: Duration = 0.1)
}
+
+/**
+ * Unsafe helper method for widening the type accepted by this [ActorRef].
+ */
+@Suppress("UNCHECKED_CAST")
+fun <U : Any, T : U> ActorRef<T>.upcast(): ActorRef<U> = this as ActorRef<U>
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt
index 54b02341..87b2f28a 100644
--- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt
@@ -24,21 +24,314 @@
package com.atlarge.odcsim
+import com.atlarge.odcsim.dsl.wrap
+
+/**
+ * The representation of the behavior of an actor.
+ *
+ * Behavior can be formulated using factory methods on the companion object or by extending either [DeferredBehavior] or
+ * [ReceivingBehavior].
+ *
+ * Users are advised not to close over [ActorContext] within [Behavior], as it will causes it to become immobile,
+ * meaning it cannot be moved to another context and executed there, and therefore it cannot be replicated or forked
+ * either.
+ *
+ * @param T The shape of the messages the behavior accepts.
+ */
+sealed class Behavior<T : Any> {
+ /**
+ * Narrow the type of this behavior.
+ *
+ * This is almost always a safe operation, but might cause [ClassCastException] in case a narrowed behavior sends
+ * messages of a different type to itself and is chained via [Behavior.orElse].
+ */
+ fun <U : T> narrow(): Behavior<U> {
+ @Suppress("UNCHECKED_CAST")
+ return this as Behavior<U>
+ }
+
+ /**
+ * Companion object used to bind factory methods to.
+ */
+ companion object {
+ /**
+ * This [Behavior] is used to signal that this actor shall terminate voluntarily. If this actor has created child actors
+ * then these will be stopped as part of the shutdown procedure.
+ */
+ fun <T : Any> stopped(): Behavior<T> {
+ @Suppress("UNCHECKED_CAST")
+ return StoppedBehavior as Behavior<T>
+ }
+
+ /**
+ * This [Behavior] is used to signal that this actor wants to reuse its previous behavior.
+ */
+ fun <T : Any> same(): Behavior<T> {
+ @Suppress("UNCHECKED_CAST")
+ return SameBehavior as Behavior<T>
+ }
+
+ /**
+ * This [Behavior] is used to signal to the system that the last message or signal went unhandled. This will
+ * reuse the previous behavior.
+ */
+ fun <T : Any> unhandled(): Behavior<T> {
+ @Suppress("UNCHECKED_CAST")
+ return UnhandledBehavior as Behavior<T>
+ }
+ }
+}
+
/**
- * The behavior of an actor defines how it reacts to the messages that it receives.
+ * A [Behavior] that defers the construction of the actual [Behavior] until the actor is started in some [ActorContext].
+ * If the actor is already started, it will immediately evaluate.
+ *
+ * @param T The shape of the messages the behavior accepts.
+ */
+abstract class DeferredBehavior<T : Any> : Behavior<T>() {
+ /**
+ * Create a [Behavior] instance in the [specified][ctx] [ActorContext].
+ *
+ * @param ctx The [ActorContext] in which the behavior runs.
+ * @return The actor's next behavior.
+ */
+ abstract operator fun invoke(ctx: ActorContext<T>): Behavior<T>
+}
+
+/**
+ * A [Behavior] that concretely defines how an actor will react to the messages and signals it receives.
* The message may either be of the type that the actor declares and which is part of the [ActorRef] signature,
* or it may be a system [Signal] that expresses a lifecycle event of either this actor or one of its child actors.
*
- * @param T The shape of the messages the behavior accepts.
+ * @param T The shape of the messages the behavior accepts.*
*/
-interface Behavior<T : Any> {
+abstract class ReceivingBehavior<T : Any> : Behavior<T>() {
/**
- * Process an incoming message and return the actor's next [Behavior].
+ * Process an incoming message of type [T] and return the actor's next behavior.
+ *
+ * The returned behavior can in addition to normal behaviors be one of the canned special objects:
+ * - returning [Behavior.Companion.stopped] will terminate this Behavior
+ * - returning [Behavior.Companion.same] designates to reuse the current Behavior
+ * - returning [Behavior.Companion.unhandled] keeps the same Behavior and signals that the message was not yet handled
+ *
+ * @param ctx The [ActorContext] in which the actor is currently running.
+ * @param msg The message that was received.
*/
- fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> = this
+ open fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> = Behavior.unhandled()
/**
- * Process an incoming [Signal] and return the actor's next [Behavior].
+ * Process an incoming [Signal] and return the actor's next behavior.
+ *
+ * The returned behavior can in addition to normal behaviors be one of the canned special objects:
+ * - returning [Behavior.Companion.stopped] will terminate this Behavior
+ * - returning [Behavior.Companion.same] designates to reuse the current Behavior
+ * - returning [Behavior.Companion.unhandled] keeps the same Behavior and signals that the message was not yet handled
+ *
+ * @param ctx The [ActorContext] in which the actor is currently running.
+ * @param signal The [Signal] that was received.
*/
- fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> = this
+ open fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> = Behavior.unhandled()
+}
+
+/**
+ * Compose this [Behavior] with a fallback [Behavior] which is used in case this [Behavior] does not handle the incoming
+ * message or signal.
+ */
+fun <T : Any> Behavior<T>.orElse(behavior: Behavior<T>): Behavior<T> =
+ Behavior.wrap(this) { left ->
+ Behavior.wrap(behavior) { right ->
+ object : ReceivingBehavior<T>() {
+ override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> {
+ if (left.interpretMessage(ctx, msg)) {
+ return left.behavior
+ } else if (right.interpretMessage(ctx, msg)) {
+ return right.behavior
+ }
+
+ return Behavior.unhandled()
+ }
+
+ override fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> {
+ if (left.interpretSignal(ctx, signal)) {
+ return left.behavior
+ } else if (right.interpretSignal(ctx, signal)) {
+ return right.behavior
+ }
+
+ return Behavior.unhandled()
+ }
+ }
+ }
+ }
+
+/**
+ * A flag to indicate whether a [Behavior] instance is still alive.
+ */
+val <T : Any> Behavior<T>.isAlive get() = this !is StoppedBehavior
+
+/**
+ * A flag to indicate whether the last message/signal went unhandled.
+ */
+val <T : Any> Behavior<T>.isUnhandled get() = this !is UnhandledBehavior
+
+/**
+ * A special [Behavior] instance that signals that the actor has stopped.
+ */
+private object StoppedBehavior : Behavior<Any>() {
+ override fun toString() = "Stopped"
+}
+
+/**
+ * A special [Behavior] object to signal that the actor wants to reuse its previous behavior.
+ */
+private object SameBehavior : Behavior<Nothing>() {
+ override fun toString() = "Same"
+}
+
+/**
+ * A special [Behavior] object that indicates that the last message or signal was not handled.
+ */
+private object UnhandledBehavior : Behavior<Nothing>() {
+ override fun toString() = "Unhandled"
+}
+
+/**
+ * Helper class that interprets messages/signals, canonicalizes special objects and manages the life-cycle of
+ * [Behavior] instances.
+ *
+ * @param initialBehavior The initial behavior to use.
+ */
+class BehaviorInterpreter<T : Any>(initialBehavior: Behavior<T>) {
+ /**
+ * The current [Behavior] instance.
+ */
+ var behavior: Behavior<T> = initialBehavior
+
+ /**
+ * A flag to indicate whether the current behavior is still alive.
+ */
+ val isAlive: Boolean get() = behavior.isAlive
+
+ /**
+ * Construct a [BehaviorInterpreter] with the specified initial behavior and immediately start it in the specified
+ * context.
+ *
+ * @param initialBehavior The initial behavior of the actor.
+ * @param ctx The [ActorContext] to run the behavior in.
+ */
+ constructor(initialBehavior: Behavior<T>, ctx: ActorContext<T>) : this(initialBehavior) {
+ start(ctx)
+ }
+
+ /**
+ * Start the initial behavior.
+ *
+ * @param ctx The [ActorContext] to start the behavior in.
+ */
+ fun start(ctx: ActorContext<T>) {
+ behavior = validateAsInitial(start(behavior, ctx))
+ }
+
+ /**
+ * Stop the current active behavior and move into the stopped state.
+ *
+ * @param ctx The [ActorContext] this takes place in.
+ */
+ fun stop(ctx: ActorContext<T>) {
+ behavior = start(StoppedBehavior.narrow(), ctx)
+ }
+
+ /**
+ * Interpret the given message of type [T] using the current active behavior.
+ *
+ * @return `true` if the message was handled by the active behavior, `false` otherwise.
+ */
+ fun interpretMessage(ctx: ActorContext<T>, msg: T): Boolean = interpret(ctx, msg, false)
+
+ /**
+ * Interpret the given [Signal] using the current active behavior.
+ *
+ * @return `true` if the signal was handled by the active behavior, `false` otherwise.
+ */
+ fun interpretSignal(ctx: ActorContext<T>, signal: Signal): Boolean = interpret(ctx, signal, true)
+
+ /**
+ * Interpret the given message or signal using the current active behavior.
+ *
+ * @return `true` if the message or signal was handled by the active behavior, `false` otherwise.
+ */
+ private fun interpret(ctx: ActorContext<T>, msg: Any, isSignal: Boolean): Boolean =
+ if (isAlive) {
+ val next = when (val current = behavior) {
+ is DeferredBehavior<T> ->
+ throw IllegalStateException("Deferred [$current] should not be passed to interpreter")
+ is ReceivingBehavior<T> ->
+ if (isSignal)
+ current.receiveSignal(ctx, msg as Signal)
+ else
+ @Suppress("UNCHECKED_CAST")
+ current.receive(ctx, msg as T)
+ is SameBehavior, is UnhandledBehavior ->
+ throw IllegalStateException("Cannot execute with [$current] as behavior")
+ is StoppedBehavior -> current
+ }
+
+ val unhandled = next.isUnhandled
+ behavior = canonicalize(next, behavior, ctx)
+ !unhandled
+ } else {
+ false
+ }
+
+ /**
+ * Validate whether the given [Behavior] can be used as initial behavior. Throw an [IllegalArgumentException] if
+ * the [Behavior] is not valid.
+ *
+ * @param behavior The behavior to validate.
+ */
+ private fun validateAsInitial(behavior: Behavior<T>): Behavior<T> =
+ when (behavior) {
+ is SameBehavior, is UnhandledBehavior ->
+ throw IllegalArgumentException("Cannot use [$behavior] as initial behavior")
+ else -> behavior
+ }
+
+ /**
+ * Helper methods to properly manage the special, canned behavior objects. It highly recommended to use the
+ * [BehaviorInterpreter] instead to properly manage the life-cycles of the behavior objects.
+ */
+ companion object {
+ /**
+ * Start the initial behavior of an actor in the specified [ActorContext].
+ *
+ * This will activate the initial behavior and canonicalize the resulting behavior.
+ *
+ * @param behavior The initial behavior to start.
+ * @param ctx The [ActorContext] to start the behavior in.
+ * @return The behavior that has been started.
+ */
+ tailrec fun <T : Any> start(behavior: Behavior<T>, ctx: ActorContext<T>): Behavior<T> =
+ when (behavior) {
+ is DeferredBehavior<T> -> start(behavior(ctx), ctx)
+ else -> behavior
+ }
+
+ /**
+ * Given a possibly special behavior (same or unhandled) and a "current" behavior (which defines the meaning of
+ * encountering a `same` behavior) this method computes the next behavior, suitable for passing a message or
+ * signal.
+ *
+ * @param behavior The actor's next behavior.
+ * @param current The actor's current behavior.
+ * @param ctx The context in which the actor runs.
+ * @return The actor's canonicalized next behavior.
+ */
+ tailrec fun <T : Any> canonicalize(behavior: Behavior<T>, current: Behavior<T>, ctx: ActorContext<T>): Behavior<T> =
+ when (behavior) {
+ is SameBehavior, current -> current
+ is UnhandledBehavior -> current
+ is DeferredBehavior<T> -> canonicalize(behavior(ctx), current, ctx)
+ else -> behavior
+ }
+ }
}
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt
new file mode 100644
index 00000000..590595d3
--- /dev/null
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/dsl/Behaviors.kt
@@ -0,0 +1,106 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2018 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.odcsim.dsl
+
+import com.atlarge.odcsim.ActorContext
+import com.atlarge.odcsim.Behavior
+import com.atlarge.odcsim.BehaviorInterpreter
+import com.atlarge.odcsim.DeferredBehavior
+import com.atlarge.odcsim.ReceivingBehavior
+import com.atlarge.odcsim.Signal
+
+/**
+ * A factory for [Behavior]. Creation of the behavior instance is deferred until the actor is started.
+ */
+fun <T : Any> Behavior.Companion.setup(block: (ActorContext<T>) -> Behavior<T>): Behavior<T> {
+ return object : DeferredBehavior<T>() {
+ override fun invoke(ctx: ActorContext<T>): Behavior<T> = block(ctx)
+ }
+}
+
+/**
+ * A [Behavior] that ignores any incoming message or signal and keeps the same behavior.
+ */
+fun <T : Any> Behavior.Companion.ignore(): Behavior<T> = IgnoreBehavior.narrow()
+
+/**
+ * A [Behavior] object that ignores all messages sent to the actor.
+ */
+private object IgnoreBehavior : ReceivingBehavior<Any>() {
+ override fun receive(ctx: ActorContext<Any>, msg: Any): Behavior<Any> = this
+
+ override fun receiveSignal(ctx: ActorContext<Any>, signal: Signal): Behavior<Any> = this
+
+ override fun toString() = "Ignore"
+}
+
+/**
+ * A [Behavior] that treats every incoming message or signal as unhandled.
+ */
+fun <T : Any> Behavior.Companion.empty(): Behavior<T> = EmptyBehavior.narrow()
+
+/**
+ * A [Behavior] object that does not handle any message it receives.
+ */
+private object EmptyBehavior : ReceivingBehavior<Any>() {
+ override fun toString() = "Empty"
+}
+
+/**
+ * Construct a [Behavior] that reacts to incoming messages, provides access to the [ActorContext] and returns the
+ * actor's next behavior.
+ */
+fun <T : Any> Behavior.Companion.receive(handler: (ActorContext<T>, T) -> Behavior<T>): Behavior<T> {
+ return object : ReceivingBehavior<T>() {
+ override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> = handler(ctx, msg)
+ }
+}
+
+/**
+ * Construct a [Behavior] that reacts to incoming messages and returns the actor's next behavior.
+ */
+fun <T : Any> Behavior.Companion.receiveMessage(onMessage: (T) -> Behavior<T>): Behavior<T> {
+ return object : ReceivingBehavior<T>() {
+ override fun receive(ctx: ActorContext<T>, msg: T): Behavior<T> = onMessage(msg)
+ }
+}
+
+/**
+ * Construct a [Behavior] that reacts to incoming signals, provides access to the [ActorContext] and returns the
+ * actor's next behavior.
+ */
+fun <T : Any> Behavior.Companion.receiveSignal(handler: (ActorContext<T>, Signal) -> Behavior<T>): Behavior<T> {
+ return object : ReceivingBehavior<T>() {
+ override fun receiveSignal(ctx: ActorContext<T>, signal: Signal): Behavior<T> = handler(ctx, signal)
+ }
+}
+
+/**
+ * Construct a [Behavior] that wraps another behavior instance and uses a [BehaviorInterpreter] to pass incoming
+ * messages and signals to the wrapper behavior.
+ */
+fun <T : Any> Behavior.Companion.wrap(behavior: Behavior<T>, wrap: (BehaviorInterpreter<T>) -> Behavior<T>): Behavior<T> {
+ return Behavior.setup { ctx -> wrap(BehaviorInterpreter(behavior, ctx)) }
+}
diff --git a/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemFactoryTest.kt b/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemFactoryTest.kt
index 266f9fba..3830f09e 100644
--- a/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemFactoryTest.kt
+++ b/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemFactoryTest.kt
@@ -24,6 +24,8 @@
package com.atlarge.odcsim
+import com.atlarge.odcsim.dsl.empty
+import com.atlarge.odcsim.dsl.setup
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
@@ -44,7 +46,7 @@ abstract class ActorSystemFactoryTest {
fun `should create a system with correct name`() {
val factory = createFactory()
val name = "test"
- val system = factory(object : Behavior<Unit> {}, name)
+ val system = factory(Behavior.empty<Unit>(), name)
assertEquals(name, system.name)
}
@@ -55,11 +57,7 @@ abstract class ActorSystemFactoryTest {
@Test
fun `should create a system with correct root behavior`() {
val factory = createFactory()
- val system = factory(object : Behavior<Unit> {
- override fun receiveSignal(ctx: ActorContext<Unit>, signal: Signal): Behavior<Unit> {
- throw UnsupportedOperationException()
- }
- }, "test")
+ val system = factory(Behavior.setup<Unit> { throw UnsupportedOperationException() }, "test")
assertThrows<UnsupportedOperationException> { system.run(until = 10.0) }
}
diff --git a/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt b/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt
index 71bc645a..879fe35a 100644
--- a/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt
+++ b/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt
@@ -24,6 +24,11 @@
package com.atlarge.odcsim
+import com.atlarge.odcsim.dsl.empty
+import com.atlarge.odcsim.dsl.ignore
+import com.atlarge.odcsim.dsl.receive
+import com.atlarge.odcsim.dsl.receiveMessage
+import com.atlarge.odcsim.dsl.setup
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertTrue
@@ -49,7 +54,7 @@ abstract class ActorSystemTest {
@Test
fun `should have a name`() {
val name = "test"
- val system = factory(object : Behavior<Unit> {}, name)
+ val system = factory(Behavior.empty<Unit>(), name)
assertEquals(name, system.name)
}
@@ -59,7 +64,7 @@ abstract class ActorSystemTest {
*/
@Test
fun `should have a path`() {
- val system = factory(object : Behavior<Unit> {}, "test")
+ val system = factory(Behavior.empty<Unit>(), "test")
assertTrue(system.path is ActorPath.Root)
}
@@ -69,7 +74,7 @@ abstract class ActorSystemTest {
*/
@Test
fun `should start at t=0`() {
- val system = factory(object : Behavior<Unit> {}, name = "test")
+ val system = factory(Behavior.empty<Unit>(), name = "test")
assertTrue(Math.abs(system.time) < DELTA)
}
@@ -79,7 +84,7 @@ abstract class ActorSystemTest {
*/
@Test
fun `should not accept negative instants for running`() {
- val system = factory(object : Behavior<Unit> {}, name = "test")
+ val system = factory(Behavior.empty<Unit>(), name = "test")
assertThrows<IllegalArgumentException> { system.run(-10.0) }
}
@@ -90,7 +95,7 @@ abstract class ActorSystemTest {
@Test
fun `should not jump backward in time`() {
val until = 10.0
- val system = factory(object : Behavior<Unit> {}, name = "test")
+ val system = factory(Behavior.empty<Unit>(), name = "test")
system.run(until = until)
system.run(until = until - 0.5)
@@ -103,7 +108,7 @@ abstract class ActorSystemTest {
@Test
fun `should jump forward in time`() {
val until = 10.0
- val system = factory(object : Behavior<Unit> {}, name = "test")
+ val system = factory(Behavior.empty<Unit>(), name = "test")
system.run(until = until)
assertTrue(Math.abs(system.time - until) < DELTA)
@@ -114,15 +119,12 @@ abstract class ActorSystemTest {
*/
@Test
fun `should order messages at the instant by insertion time`() {
- val behavior = object : Behavior<Int> {
- override fun receive(ctx: ActorContext<Int>, msg: Int): Behavior<Int> {
- assertEquals(1, msg)
- return object : Behavior<Int> {
- override fun receive(ctx: ActorContext<Int>, msg: Int): Behavior<Int> {
- assertEquals(2, msg)
- return this
- }
- }
+ val behavior = Behavior.receiveMessage<Int> { msg ->
+ assertEquals(1, msg)
+
+ Behavior.receiveMessage {
+ assertEquals(2, it)
+ Behavior.ignore()
}
}
val system = factory(behavior, name = "test")
@@ -131,6 +133,14 @@ abstract class ActorSystemTest {
system.run(until = 10.0)
}
+ /**
+ * Test whether an [ActorSystem] will not initialize the root actor if the system has not been run yet.
+ */
+ @Test
+ fun `should not initialize root actor if not run`() {
+ factory(Behavior.setup<Unit> { TODO() }, name = "test")
+ }
+
@Nested
@DisplayName("ActorRef")
inner class ActorRefTest {
@@ -139,7 +149,7 @@ abstract class ActorSystemTest {
*/
@Test
fun `should disallow messages in the past`() {
- val system = factory(object : Behavior<Unit> {}, name = "test")
+ val system = factory(Behavior.empty<Unit>(), name = "test")
assertThrows<IllegalArgumentException> { system.send(Unit, after = -1.0) }
}
}
@@ -153,11 +163,9 @@ abstract class ActorSystemTest {
*/
@Test
fun `should pre-start at t=0 if root`() {
- val behavior = object : Behavior<Unit> {
- override fun receiveSignal(ctx: ActorContext<Unit>, signal: Signal): Behavior<Unit> {
- assertTrue(Math.abs(ctx.time) < DELTA)
- return this
- }
+ val behavior = Behavior.setup<Unit> { ctx ->
+ assertTrue(Math.abs(ctx.time) < DELTA)
+ Behavior.ignore()
}
val system = factory(behavior, "test")
@@ -169,21 +177,12 @@ abstract class ActorSystemTest {
*/
@Test
fun `should allow spawning of child actors`() {
- val behavior = object : Behavior<Unit> {
- override fun receiveSignal(ctx: ActorContext<Unit>, signal: Signal): Behavior<Unit> {
- throw UnsupportedOperationException("b")
- }
- }
+ val behavior = Behavior.setup<Unit> { throw UnsupportedOperationException("b") }
- val system = factory(object : Behavior<Unit> {
- override fun receiveSignal(ctx: ActorContext<Unit>, signal: Signal): Behavior<Unit> {
- if (signal is PreStart) {
- val ref = ctx.spawn(behavior, "child")
- assertEquals("child", ref.path.name)
- }
-
- return this
- }
+ val system = factory(Behavior.setup<Unit> { ctx ->
+ val ref = ctx.spawn(behavior, "child")
+ assertEquals("child", ref.path.name)
+ Behavior.ignore()
}, name = "test")
assertThrows<UnsupportedOperationException> { system.run(until = 10.0) }
@@ -194,20 +193,12 @@ abstract class ActorSystemTest {
*/
@Test
fun `should allow stopping of child actors`() {
- val system = factory(object : Behavior<Unit> {
- override fun receiveSignal(ctx: ActorContext<Unit>, signal: Signal): Behavior<Unit> {
- if (signal is PreStart) {
- val ref = ctx.spawn(object : Behavior<Unit> {
- override fun receive(ctx: ActorContext<Unit>, msg: Unit): Behavior<Unit> {
- throw UnsupportedOperationException()
- }
- }, "child")
- assertTrue(ctx.stop(ref))
- assertEquals("child", ref.path.name)
- }
-
- return this
- }
+ val system = factory(Behavior.setup<Unit> { ctx ->
+ val ref = ctx.spawn(Behavior.receiveMessage<Unit> { throw UnsupportedOperationException() }, "child")
+ assertTrue(ctx.stop(ref))
+ assertEquals("child", ref.path.name)
+
+ Behavior.ignore()
}, name = "test")
system.run(until = 10.0)
@@ -218,17 +209,15 @@ abstract class ActorSystemTest {
*/
@Test
fun `should only be able to terminate child actors`() {
- val system = factory(object : Behavior<Unit> {
- override fun receiveSignal(ctx: ActorContext<Unit>, signal: Signal): Behavior<Unit> {
- val child1 = ctx.spawn(object : Behavior<Unit> {}, "child-1")
- ctx.spawn(object : Behavior<Unit> {
- override fun receiveSignal(ctx: ActorContext<Unit>, signal: Signal): Behavior<Unit> {
- assertFalse(ctx.stop(child1))
- return this
- }
- }, "child-2")
- return this
- }
+ val system = factory(Behavior.setup<Unit> { ctx ->
+ val child1 = ctx.spawn(Behavior.ignore<Unit>(), "child-1")
+
+ ctx.spawn(Behavior.setup<Unit> {
+ assertFalse(it.stop(child1))
+ Behavior.ignore()
+ }, "child-2")
+
+ Behavior.ignore()
}, name = "test")
system.run()
}
@@ -238,13 +227,11 @@ abstract class ActorSystemTest {
*/
@Test
fun `should not be able to stop an already terminated child`() {
- val system = factory(object : Behavior<Unit> {
- override fun receiveSignal(ctx: ActorContext<Unit>, signal: Signal): Behavior<Unit> {
- val child = ctx.spawn(object : Behavior<Unit> {}, "child")
- ctx.stop(child)
- assertFalse(ctx.stop(child))
- return this
- }
+ val system = factory(Behavior.setup<Unit> { ctx ->
+ val child = ctx.spawn(Behavior.ignore<Unit>(), "child")
+ ctx.stop(child)
+ assertFalse(ctx.stop(child))
+ Behavior.ignore()
}, name = "test")
system.run()
}
@@ -254,53 +241,98 @@ abstract class ActorSystemTest {
*/
@Test
fun `should terminate children of child when terminating it`() {
- val system = factory(object : Behavior<ActorRef<Unit>> {
- lateinit var child: ActorRef<Unit>
+ val system = factory(Behavior.setup<ActorRef<Unit>> { ctx1 ->
+ val root = ctx1.self
+ val child = ctx1.spawn(Behavior.setup<Unit> {
+ val child = ctx1.spawn(Behavior.receiveMessage<Unit> {
+ throw IllegalStateException("DELIBERATE")
+ }, "child")
+ root.send(child)
+ Behavior.ignore()
+ }, "child")
- override fun receive(ctx: ActorContext<ActorRef<Unit>>, msg: ActorRef<Unit>): Behavior<ActorRef<Unit>> {
- assertTrue(ctx.stop(child))
+
+ Behavior.receive { ctx2, msg ->
+ assertTrue(ctx2.stop(child))
msg.send(Unit) // This actor should be stopped now and not receive the message anymore
- return this
+ Behavior.stopped()
}
- override fun receiveSignal(ctx: ActorContext<ActorRef<Unit>>, signal: Signal): Behavior<ActorRef<Unit>> {
- val root = ctx.self
- child = ctx.spawn(object : Behavior<Unit> {
- override fun receiveSignal(ctx: ActorContext<Unit>, signal: Signal): Behavior<Unit> {
- if (signal is PreStart) {
- val child = ctx.spawn(object : Behavior<Unit> {
- override fun receive(ctx: ActorContext<Unit>, msg: Unit): Behavior<Unit> {
- throw IllegalStateException()
- }
- }, "child")
- root.send(child)
- }
- return this
- }
- }, "child")
- return this
- }
}, name = "test")
+
system.run()
}
/**
- * Test whether the reference to the actor itself is valid.
+ * Test whether [Behavior.Companion.same] works correctly.
*/
@Test
- fun `should have reference to itself`() {
- val behavior = object : Behavior<Unit> {
- override fun receive(ctx: ActorContext<Unit>, msg: Unit): Behavior<Unit> {
- throw UnsupportedOperationException()
- }
- override fun receiveSignal(ctx: ActorContext<Unit>, signal: Signal): Behavior<Unit> {
+ fun `should keep same behavior on same`() {
+ var counter = 0
+
+ val behavior = Behavior.setup<Unit> { ctx ->
+ counter++
ctx.self.send(Unit)
- return this
+
+ Behavior.receiveMessage {
+ counter++
+ Behavior.same()
}
}
val system = factory(behavior, "test")
+ system.run()
+ assertEquals(2, counter)
+ }
+
+ /**
+ * Test whether the reference to the actor itself is valid.
+ */
+ @Test
+ fun `should have reference to itself`() {
+ val behavior: Behavior<Unit> = Behavior.setup { ctx ->
+ ctx.self.send(Unit)
+ Behavior.receiveMessage { throw UnsupportedOperationException() }
+ }
+
+ val system = factory(behavior, "test")
assertThrows<UnsupportedOperationException> { system.run() }
}
+
+ /**
+ * Test whether we cannot start an actor with the [Behavior.Companion.same] behavior.
+ */
+ @Test
+ fun `should not start with same behavior`() {
+ val system = factory(Behavior.same<Unit>(), "test")
+ assertThrows<IllegalArgumentException> { system.run() }
+ }
+
+ /**
+ * Test whether we cannot start an actor with the [Behavior.Companion.unhandled] behavior.
+ */
+ @Test
+ fun `should not start with unhandled behavior`() {
+ val system = factory(Behavior.unhandled<Unit>(), "test")
+ assertThrows<IllegalArgumentException> { system.run() }
+ }
+
+ /**
+ * Test whether we cannot start an actor with deferred unhandled behavior.
+ */
+ @Test
+ fun `should not start with deferred unhandled behavior`() {
+ val system = factory(Behavior.setup<Unit> { Behavior.unhandled() }, "test")
+ assertThrows<IllegalArgumentException> { system.run() }
+ }
+
+ /**
+ * Test whether we can start an actor with the [Behavior.Companion.stopped] behavior.
+ */
+ @Test
+ fun `should start with stopped behavior`() {
+ val system = factory(Behavior.stopped<Unit>(), "test")
+ system.run()
+ }
}
}
diff --git a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
index 3eaddf51..3da82b3d 100644
--- a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
+++ b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
@@ -29,6 +29,7 @@ import com.atlarge.odcsim.ActorPath
import com.atlarge.odcsim.ActorRef
import com.atlarge.odcsim.ActorSystem
import com.atlarge.odcsim.Behavior
+import com.atlarge.odcsim.BehaviorInterpreter
import com.atlarge.odcsim.Duration
import com.atlarge.odcsim.Instant
import com.atlarge.odcsim.PostStop
@@ -58,6 +59,11 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
override val path: ActorPath = ActorPath.Root(name = "/user")
/**
+ * A flag to indicate the system has started.
+ */
+ private var isStarted: Boolean = false
+
+ /**
* The event queue to process
*/
private val queue: PriorityQueue<Envelope> = PriorityQueue(Comparator
@@ -69,10 +75,21 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
*/
private val registry: MutableMap<ActorPath, Actor<*>> = HashMap()
+ init {
+ registry[path] = Actor(this, root)
+ schedule(this, PreStart, .0)
+ }
+
override fun run(until: Duration) {
require(until >= .0) { "The given instant must be a non-negative number" }
- while (true) {
+ // Start the root actor on initial run
+ if (!isStarted) {
+ registry[path]!!.start()
+ isStarted = true
+ }
+
+ while (time < until) {
val envelope = queue.peek() ?: break
val delivery = envelope.time.takeUnless { it > until } ?: break
@@ -99,13 +116,15 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
*/
private var nextId: Long = 0
- init {
- registry[path] = Actor(this, root)
- schedule(this, PreStart, .0)
- }
-
- private inner class Actor<T : Any>(override val self: ActorRef<T>, var behavior: Behavior<T>) : ActorContext<T> {
+ /**
+ * An actor as represented in the Omega engine.
+ *
+ * @param self The [ActorRef] to this actor.
+ * @param initialBehavior The initial behavior of this actor.
+ */
+ private inner class Actor<T : Any>(override val self: ActorRef<T>, initialBehavior: Behavior<T>) : ActorContext<T> {
val children: MutableSet<Actor<*>> = mutableSetOf()
+ val interpreter = BehaviorInterpreter(initialBehavior)
override val time: Instant
get() = this@OmegaActorSystem.time
@@ -117,6 +136,7 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
registry[ref.path] = actor
children += actor
schedule(ref, PreStart, .0)
+ actor.start()
}
return ref
}
@@ -127,25 +147,49 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
return false
}
val ref = registry[child.path] ?: return false
- ref.terminate()
+ ref.stop()
return true
}
/**
+ * Start this actor.
+ */
+ fun start() {
+ interpreter.start(this)
+ }
+
+ /**
+ * Stop this actor.
+ */
+ fun stop() {
+ interpreter.stop(this)
+ terminate()
+ interpreter.interpretSignal(this, PostStop)
+ }
+
+ /**
* Terminate this actor and its children.
*/
fun terminate() {
children.forEach { it.terminate() }
registry.remove(self.path)
- interpretMessage(PostStop)
}
/**
- * Interpret the given message send to an actor. Make sure the message is of the correct type.
+ * Interpret the given message send to an actor.
*/
fun interpretMessage(msg: Any) {
- @Suppress("UNCHECKED_CAST")
- behavior = if (msg is Signal) behavior.receiveSignal(this, msg) else behavior.receive(this, msg as T)
+ if (msg is Signal) {
+ interpreter.interpretSignal(this, msg)
+ } else {
+ @Suppress("UNCHECKED_CAST")
+ interpreter.interpretMessage(this, msg as T)
+ }
+
+ if (!interpreter.isAlive) {
+ terminate()
+ interpreter.interpretSignal(this, PostStop)
+ }
}
override fun equals(other: Any?): Boolean =