diff options
10 files changed, 155 insertions, 69 deletions
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt index 499e7df0..f8128490 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt @@ -38,6 +38,11 @@ interface ActorContext<T : Any> { val self: ActorRef<T> /** + * A view of the children of this actor. + */ + val children: List<ActorRef<*>> + + /** * The point of time within the simulation. */ val time: Instant @@ -53,6 +58,14 @@ interface ActorContext<T : Any> { val log: Logger /** + * Obtain the child of this actor with the specified name. + * + * @param name The name of the child actor to obtain. + * @return The reference to the child actor or `null` if it does not exist. + */ + fun getChild(name: String): ActorRef<*>? + + /** * Send the specified message to the actor referenced by this [ActorRef]. * * Please note that callees must guarantee that messages are sent strictly in increasing time. @@ -69,6 +82,9 @@ interface ActorContext<T : Any> { /** * Spawn a child actor from the given [Behavior] and with the specified name. * + * The name may not be empty or start with "$". Moreover, the name of an actor must be unique and this method + * will throw an [IllegalArgumentException] in case a child actor of the given name already exists. + * * @param behavior The behavior of the child actor to spawn. * @param name The name of the child actor to spawn. * @return A reference to the child that has/will be spawned. @@ -84,12 +100,16 @@ interface ActorContext<T : Any> { fun <U : Any> spawnAnonymous(behavior: Behavior<U>): ActorRef<U> /** - * Request the specified child actor to be stopped in asynchronous fashion. + * Force the specified child actor to terminate after it finishes processing its current message. + * Nothing will happen if the child is already stopped. + * + * Only direct children of an actor may be stopped through the actor context. Trying to stop other actors via this + * method will result in an [IllegalArgumentException]. Instead, stopping other actors has to be expressed as + * an explicit stop message that the actor accept. * * @param child The reference to the child actor to stop. - * @return `true` if the ref points to a child actor, otherwise `false`. */ - fun stop(child: ActorRef<*>): Boolean + fun stop(child: ActorRef<*>) /** * Watch the specified [ActorRef] for termination of the referenced actor. On termination of the watched actor, diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorPath.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorPath.kt index f51a4fed..a6c716a2 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorPath.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorPath.kt @@ -51,6 +51,11 @@ sealed class ActorPath : Comparable<ActorPath>, Serializable { fun child(name: String): ActorPath = Child(this, name) /** + * Create a new child actor path. + */ + operator fun div(name: String): ActorPath = child(name) + + /** * Recursively create a descendant’s path by appending all child names. */ fun descendant(children: Iterable<String>): ActorPath = children.fold(this) { parent, name -> diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt index bae1fb74..d65beebd 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt @@ -59,11 +59,18 @@ interface ActorSystem<in T : Any> : ActorRef<T> { fun send(msg: T, after: Duration = 0.1) /** - * Terminates this actor system. + * Terminates this actor system in an asynchronous fashion. * * This will stop the root actor and in turn will recursively stop all its child actors. - * - * This is an asynchronous operation. */ fun terminate() + + /** + * Create an actor in the "/system" namespace. This actor will be shut down during `system.terminate()` only after + * all user actors have terminated. + * + * @param behavior The behavior of the system actor to spawn. + * @param name The name of the system actor to spawn. + */ + suspend fun <U : Any> spawnSystem(behavior: Behavior<U>, name: String): ActorRef<U> } diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt index 2ad042e3..9b707348 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt @@ -37,10 +37,19 @@ object PreStart : Signal /** * Lifecycle signal that is fired after this actor and all its child actors (transitively) have terminated. + * The [Terminated] signal is only sent to registered watchers after this signal has been processed. */ object PostStop : Signal /** + * A lifecycle signal to indicate that an actor that was watched has terminated. + * + * @property ref The reference to the actor that has terminated. + * @property failure The failure that caused the termination, or `null` on graceful termination. + */ +data class Terminated(val ref: ActorRef<*>, val failure: Throwable? = null) : Signal + +/** * A [Signal] to indicate an actor has timed out. * * This class contains a [target] property in order to allow nested behavior to function properly when multiple layers @@ -49,11 +58,3 @@ object PostStop : Signal * @property target The target object that has timed out. */ data class Timeout(val target: Any) : Signal - -/** - * A lifecycle signal to indicate that an actor that was watched has terminated. - * - * @property ref The reference to the actor that has terminated. - * @property failure The failure that caused the termination, or `null` on graceful termination. - */ -data class Terminated(val ref: ActorRef<*>, val failure: Throwable? = null) : Signal diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt index 13e722fc..f9e8de00 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt @@ -95,19 +95,24 @@ internal class SuspendingBehaviorImpl<T : Any>( override val time: Instant get() = actorContext.time + override val children: List<ActorRef<*>> + get() = actorContext.children + override val system: ActorSystem<*> get() = actorContext.system override val log: Logger get() = actorContext.log + override fun getChild(name: String): ActorRef<*>? = actorContext.getChild(name) + override fun <U : Any> send(ref: ActorRef<U>, msg: U, after: Duration) = actorContext.send(ref, msg, after) override fun <U : Any> spawn(behavior: Behavior<U>, name: String) = actorContext.spawn(behavior, name) override fun <U : Any> spawnAnonymous(behavior: Behavior<U>) = actorContext.spawnAnonymous(behavior) - override fun stop(child: ActorRef<*>): Boolean = actorContext.stop(child) + override fun stop(child: ActorRef<*>) = actorContext.stop(child) override fun watch(target: ActorRef<*>) = actorContext.watch(target) diff --git a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt index 37b5395f..ec088b98 100644 --- a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt +++ b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt @@ -36,8 +36,11 @@ import com.atlarge.odcsim.PostStop import com.atlarge.odcsim.PreStart import com.atlarge.odcsim.Signal import com.atlarge.odcsim.Terminated +import com.atlarge.odcsim.empty import com.atlarge.odcsim.internal.BehaviorInterpreter import com.atlarge.odcsim.internal.logging.LoggerImpl +import com.sun.org.apache.xalan.internal.lib.ExsltDatetime.time +import com.sun.xml.internal.messaging.saaj.soap.impl.EnvelopeImpl import org.slf4j.Logger import java.util.Collections import java.util.PriorityQueue @@ -51,54 +54,73 @@ import kotlin.math.max * This engine implementation is a single-threaded implementation, running actors synchronously and * provides a single priority queue for all events (messages, ticks, etc) that occur. * - * @param root The behavior of the root actor. + * @param guardianBehavior The behavior of the guardian (root) actor. * @param name The name of the engine instance. */ -class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) : ActorSystem<T>, ActorRef<T> { +class OmegaActorSystem<in T : Any>(guardianBehavior: Behavior<T>, override val name: String) : ActorSystem<T>, ActorRef<T> { /** - * The current point in simulation time. + * The state of the actor system. */ - override var time: Instant = .0 + private var state: ActorSystemState = ActorSystemState.CREATED /** - * The path to the root actor. + * The event queue to process */ - override val path: ActorPath = ActorPath.Root(name = "/user") + private val queue: PriorityQueue<EnvelopeImpl> = PriorityQueue( + Comparator + .comparingDouble(EnvelopeImpl::time) + .thenComparingLong(EnvelopeImpl::id) + ) /** - * The state of the actor system. + * The registry of actors in the system. */ - private var state: ActorSystemState = ActorSystemState.CREATED + private val registry: MutableMap<ActorPath, Actor<*>> = HashMap() /** - * The event queue to process + * The root actor path of the system. */ - private val queue: PriorityQueue<EnvelopeImpl> = PriorityQueue(Comparator - .comparingDouble(EnvelopeImpl::time) - .thenComparingLong(EnvelopeImpl::id)) + private val root: ActorPath = ActorPath.Root() /** - * The registry of actors in the system. + * The system actor path. */ - private val registry: MutableMap<ActorPath, Actor<*>> = HashMap() + private val system: ActorPath = root / "system" + + /** + * The current point in simulation time. + */ + override var time: Instant = .0 + + /** + * The path to the root actor. + */ + override val path: ActorPath = root / "user" init { - registry[path] = Actor(this, root) + registry[system] = Actor(ActorRefImpl(this, system), empty<Nothing>()) + registry[path] = Actor(this, guardianBehavior) schedule(this, PreStart, .0) } override fun run(until: Duration) { require(until >= .0) { "The given instant must be a non-negative number" } - // Start the root actor on initial run + // Start the system/guardian actor on initial run if (state == ActorSystemState.CREATED) { state = ActorSystemState.STARTED + registry[system]!!.isolate { it.start() } registry[path]!!.isolate { it.start() } } else if (state == ActorSystemState.TERMINATED) { throw IllegalStateException("The ActorSystem has been terminated.") } while (time < until) { + // Check whether the system was interrupted + if (Thread.interrupted()) { + throw InterruptedException() + } + val envelope = queue.peek() ?: break val delivery = envelope.time.takeUnless { it > until } ?: break @@ -124,6 +146,11 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) override fun terminate() { registry[path]?.stop(null) + registry[system]?.stop(null) + } + + override suspend fun <U : Any> spawnSystem(behavior: Behavior<U>, name: String): ActorRef<U> { + return registry[system]!!.spawn(behavior, name) } override fun compareTo(other: ActorRef<*>): Int = path.compareTo(other.path) @@ -140,22 +167,28 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) * @param initialBehavior The initial behavior of this actor. */ private inner class Actor<T : Any>(override val self: ActorRef<T>, initialBehavior: Behavior<T>) : ActorContext<T> { - val children: MutableSet<Actor<*>> = mutableSetOf() + val childActors: MutableMap<String, Actor<*>> = mutableMapOf() val interpreter = BehaviorInterpreter(initialBehavior) val watchers: MutableSet<ActorPath> = Collections.newSetFromMap(WeakHashMap<ActorPath, Boolean>()) override val time: Instant get() = this@OmegaActorSystem.time + override val children: List<ActorRef<*>> + get() = childActors.values.map { it.self } + override val system: ActorSystem<*> get() = this@OmegaActorSystem override val log: Logger by lazy(LazyThreadSafetyMode.NONE) { LoggerImpl(this) } + override fun getChild(name: String): ActorRef<*>? = childActors[name]?.self + override fun <U : Any> send(ref: ActorRef<U>, msg: U, after: Duration) = schedule(ref, msg, after) override fun <U : Any> spawn(behavior: Behavior<U>, name: String): ActorRef<U> { - require(!name.startsWith("$")) { "Name may not start with $-sign" } + require(name.isNotEmpty()) { "Actor name may not be empty" } + require(!name.startsWith("$")) { "Actor name may not start with $-sign" } return internalSpawn(behavior, name) } @@ -165,25 +198,34 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) } private fun <U : Any> internalSpawn(behavior: Behavior<U>, name: String): ActorRef<U> { + require(name !in childActors) { "Actor name $name not unique" } val ref = ActorRefImpl<U>(this@OmegaActorSystem, self.path.child(name)) - if (ref.path !in registry) { - val actor = Actor(ref, behavior) - registry[ref.path] = actor - children += actor - schedule(ref, PreStart, .0) - actor.start() - } + val actor = Actor(ref, behavior) + registry[ref.path] = actor + childActors[name] = actor + schedule(ref, PreStart, .0) + actor.start() return ref } - override fun stop(child: ActorRef<*>): Boolean { - if (child.path.parent != self.path) { - // This is not a child of this actor - return false + override fun stop(child: ActorRef<*>) { + when { + // Must be a direct child of this actor + child.path.parent == self.path -> { + val ref = childActors[child.path.name] ?: return + ref.stop(null) + } + self == child -> throw IllegalArgumentException( + "Only direct children of an actor may be stopped through the actor context, " + + "but you tried to stop [$self] by passing its ActorRef to the `stop` method. " + + "Stopping self has to be expressed as explicitly returning a Stop Behavior." + ) + else -> throw IllegalArgumentException( + "Only direct children of an actor may be stopped through the actor context, " + + "but [$child] is not a child of [$self]. Stopping other actors has to be expressed as " + + "an explicit stop message that the actor accepts." + ) } - val ref = registry[child.path] ?: return false - ref.stop(null) - return true } override fun watch(target: ActorRef<*>) { @@ -214,11 +256,11 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) */ fun stop(failure: Throwable?) { interpreter.stop(this) - children.forEach { it.stop(failure) } - val termination = Terminated(self, failure) - watchers.forEach { schedule(it, termination, 0.0) } + childActors.values.forEach { it.stop(failure) } registry.remove(self.path) interpreter.interpretSignal(this, PostStop) + val termination = Terminated(self, failure) + watchers.forEach { schedule(it, termination, 0.0) } } /** diff --git a/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemContract.kt b/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemContract.kt index e7639db8..593f587b 100644 --- a/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemContract.kt +++ b/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemContract.kt @@ -29,6 +29,7 @@ import com.atlarge.odcsim.ActorRef import com.atlarge.odcsim.ActorSystemFactory import com.atlarge.odcsim.Behavior import com.atlarge.odcsim.Terminated +import com.atlarge.odcsim.coroutines.dsl.timeout import com.atlarge.odcsim.coroutines.suspending import com.atlarge.odcsim.empty import com.atlarge.odcsim.ignore @@ -38,7 +39,6 @@ import com.atlarge.odcsim.same import com.atlarge.odcsim.setup import com.atlarge.odcsim.stopped import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertFalse import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Nested @@ -230,9 +230,8 @@ abstract class ActorSystemContract { fun `should allow stopping of child actors`() { val system = factory(setup<Unit> { ctx -> val ref = ctx.spawn(receiveMessage<Unit> { throw UnsupportedOperationException() }, "child") - assertTrue(ctx.stop(ref)) + ctx.stop(ref) assertEquals("child", ref.path.name) - ignore() }, name = "test") @@ -248,7 +247,7 @@ abstract class ActorSystemContract { val system = factory(setup<Unit> { ctx1 -> val child1 = ctx1.spawn(ignore<Unit>(), "child-1") ctx1.spawn(setup<Unit> { ctx2 -> - assertFalse(ctx2.stop(child1)) + ctx2.stop(child1) ignore() }, "child-2") @@ -259,14 +258,14 @@ abstract class ActorSystemContract { } /** - * Test whether terminating an already terminated child fails. + * Test whether stopping a child is idempotent. */ @Test - fun `should not be able to stop an already terminated child`() { + fun `should be able to stop a child twice`() { val system = factory(setup<Unit> { ctx -> val child = ctx.spawn(ignore<Unit>(), "child") ctx.stop(child) - assertFalse(ctx.stop(child)) + ctx.stop(child) ignore() }, name = "test") system.run() @@ -289,7 +288,7 @@ abstract class ActorSystemContract { }, "child") receiveMessage { msg -> - assertTrue(ctx.stop(child)) + ctx.stop(child) ctx.send(msg, Unit) // This actor should be stopped now and not receive the message anymore stopped() } @@ -375,7 +374,7 @@ abstract class ActorSystemContract { var received = false val system = factory(setup<Nothing> { ctx -> val child = ctx.spawn(suspending<Nothing> { - it.timeout(50.0) + timeout(50.0) stopped() }, "child") ctx.watch(child) diff --git a/odcsim-testkit/src/main/kotlin/com/atlarge/odcsim/testkit/internal/ActorContextStub.kt b/odcsim-testkit/src/main/kotlin/com/atlarge/odcsim/testkit/internal/ActorContextStub.kt index 7035b908..79873141 100644 --- a/odcsim-testkit/src/main/kotlin/com/atlarge/odcsim/testkit/internal/ActorContextStub.kt +++ b/odcsim-testkit/src/main/kotlin/com/atlarge/odcsim/testkit/internal/ActorContextStub.kt @@ -43,11 +43,14 @@ internal class ActorContextStub<T : Any>(private val owner: BehaviorTestKitImpl< /** * The children of this context. */ - val children = HashMap<String, BehaviorTestKitImpl<*>>() + val childActors = HashMap<String, BehaviorTestKitImpl<*>>() override val self: ActorRef<T> get() = owner.ref + override val children: List<ActorRef<*>> + get() = childActors.values.map { it.ref } + override val time: Instant get() = owner.time @@ -59,6 +62,8 @@ internal class ActorContextStub<T : Any>(private val owner: BehaviorTestKitImpl< LoggerImpl(this) } + override fun getChild(name: String): ActorRef<*>? = childActors[name]?.ref + override fun <U : Any> send(ref: ActorRef<U>, msg: U, after: Duration) { if (ref !is TestInboxImpl.ActorRefImpl) { throw IllegalArgumentException("The referenced ActorRef is not part of the test kit") @@ -69,7 +74,7 @@ internal class ActorContextStub<T : Any>(private val owner: BehaviorTestKitImpl< override fun <U : Any> spawn(behavior: Behavior<U>, name: String): ActorRef<U> { val btk = BehaviorTestKitImpl(behavior, self.path.child(name)) - children[name] = btk + childActors[name] = btk return btk.ref } @@ -77,13 +82,9 @@ internal class ActorContextStub<T : Any>(private val owner: BehaviorTestKitImpl< return spawn(behavior, "$" + UUID.randomUUID()) } - override fun stop(child: ActorRef<*>): Boolean { - if (child.path.parent != self.path) { - // This is not a child of this actor - return false - } - children -= child.path.name - return true + override fun stop(child: ActorRef<*>) { + require(child.path.parent == self.path) { "Only direct children of an actor may be stopped through the actor context." } + childActors -= child.path.name } override fun watch(target: ActorRef<*>) {} diff --git a/odcsim-testkit/src/main/kotlin/com/atlarge/odcsim/testkit/internal/ActorSystemStub.kt b/odcsim-testkit/src/main/kotlin/com/atlarge/odcsim/testkit/internal/ActorSystemStub.kt index f61c1d76..fee34a48 100644 --- a/odcsim-testkit/src/main/kotlin/com/atlarge/odcsim/testkit/internal/ActorSystemStub.kt +++ b/odcsim-testkit/src/main/kotlin/com/atlarge/odcsim/testkit/internal/ActorSystemStub.kt @@ -25,7 +25,9 @@ package com.atlarge.odcsim.testkit.internal import com.atlarge.odcsim.ActorPath +import com.atlarge.odcsim.ActorRef import com.atlarge.odcsim.ActorSystem +import com.atlarge.odcsim.Behavior import com.atlarge.odcsim.Duration import com.atlarge.odcsim.Instant @@ -49,4 +51,8 @@ internal class ActorSystemStub<T : Any>(private val owner: BehaviorTestKitImpl<T override val path: ActorPath get() = owner.ref.path + + override suspend fun <U : Any> spawnSystem(behavior: Behavior<U>, name: String): ActorRef<U> { + throw IllegalStateException("Cannot spawn system actor") + } } diff --git a/odcsim-testkit/src/main/kotlin/com/atlarge/odcsim/testkit/internal/BehaviorTestKitImpl.kt b/odcsim-testkit/src/main/kotlin/com/atlarge/odcsim/testkit/internal/BehaviorTestKitImpl.kt index cb216614..2bd5b973 100644 --- a/odcsim-testkit/src/main/kotlin/com/atlarge/odcsim/testkit/internal/BehaviorTestKitImpl.kt +++ b/odcsim-testkit/src/main/kotlin/com/atlarge/odcsim/testkit/internal/BehaviorTestKitImpl.kt @@ -97,7 +97,7 @@ internal class BehaviorTestKitImpl<T : Any>( override fun <U : Any> childTestKit(name: String): BehaviorTestKit<U> { @Suppress("UNCHECKED_CAST") - return context.children[ref.path.name] as BehaviorTestKitImpl<U>? ?: throw IllegalArgumentException("$ref is not a child of $this") + return context.childActors[ref.path.name] as BehaviorTestKitImpl<U>? ?: throw IllegalArgumentException("$ref is not a child of $this") } override fun <U : Any> childTestKit(ref: ActorRef<U>): BehaviorTestKit<U> { |
