From 2f6dcaef25d80a1411512e482953c83990149fd1 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 9 May 2019 17:54:02 +0200 Subject: refactor: Make actor spawning and stopping behavior consistent This change makes the behavior for spawning and stopping a child actor more consistent and better specified in the documentation. --- .../odcsim/engine/omega/OmegaActorSystem.kt | 112 ++++++++++++++------- 1 file changed, 77 insertions(+), 35 deletions(-) (limited to 'odcsim-engine-omega/src') diff --git a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt index 37b5395f..ec088b98 100644 --- a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt +++ b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt @@ -36,8 +36,11 @@ import com.atlarge.odcsim.PostStop import com.atlarge.odcsim.PreStart import com.atlarge.odcsim.Signal import com.atlarge.odcsim.Terminated +import com.atlarge.odcsim.empty import com.atlarge.odcsim.internal.BehaviorInterpreter import com.atlarge.odcsim.internal.logging.LoggerImpl +import com.sun.org.apache.xalan.internal.lib.ExsltDatetime.time +import com.sun.xml.internal.messaging.saaj.soap.impl.EnvelopeImpl import org.slf4j.Logger import java.util.Collections import java.util.PriorityQueue @@ -51,54 +54,73 @@ import kotlin.math.max * This engine implementation is a single-threaded implementation, running actors synchronously and * provides a single priority queue for all events (messages, ticks, etc) that occur. * - * @param root The behavior of the root actor. + * @param guardianBehavior The behavior of the guardian (root) actor. * @param name The name of the engine instance. */ -class OmegaActorSystem(root: Behavior, override val name: String) : ActorSystem, ActorRef { +class OmegaActorSystem(guardianBehavior: Behavior, override val name: String) : ActorSystem, ActorRef { /** - * The current point in simulation time. + * The state of the actor system. */ - override var time: Instant = .0 + private var state: ActorSystemState = ActorSystemState.CREATED /** - * The path to the root actor. + * The event queue to process */ - override val path: ActorPath = ActorPath.Root(name = "/user") + private val queue: PriorityQueue = PriorityQueue( + Comparator + .comparingDouble(EnvelopeImpl::time) + .thenComparingLong(EnvelopeImpl::id) + ) /** - * The state of the actor system. + * The registry of actors in the system. */ - private var state: ActorSystemState = ActorSystemState.CREATED + private val registry: MutableMap> = HashMap() /** - * The event queue to process + * The root actor path of the system. */ - private val queue: PriorityQueue = PriorityQueue(Comparator - .comparingDouble(EnvelopeImpl::time) - .thenComparingLong(EnvelopeImpl::id)) + private val root: ActorPath = ActorPath.Root() /** - * The registry of actors in the system. + * The system actor path. */ - private val registry: MutableMap> = HashMap() + private val system: ActorPath = root / "system" + + /** + * The current point in simulation time. + */ + override var time: Instant = .0 + + /** + * The path to the root actor. + */ + override val path: ActorPath = root / "user" init { - registry[path] = Actor(this, root) + registry[system] = Actor(ActorRefImpl(this, system), empty()) + registry[path] = Actor(this, guardianBehavior) schedule(this, PreStart, .0) } override fun run(until: Duration) { require(until >= .0) { "The given instant must be a non-negative number" } - // Start the root actor on initial run + // Start the system/guardian actor on initial run if (state == ActorSystemState.CREATED) { state = ActorSystemState.STARTED + registry[system]!!.isolate { it.start() } registry[path]!!.isolate { it.start() } } else if (state == ActorSystemState.TERMINATED) { throw IllegalStateException("The ActorSystem has been terminated.") } while (time < until) { + // Check whether the system was interrupted + if (Thread.interrupted()) { + throw InterruptedException() + } + val envelope = queue.peek() ?: break val delivery = envelope.time.takeUnless { it > until } ?: break @@ -124,6 +146,11 @@ class OmegaActorSystem(root: Behavior, override val name: String) override fun terminate() { registry[path]?.stop(null) + registry[system]?.stop(null) + } + + override suspend fun spawnSystem(behavior: Behavior, name: String): ActorRef { + return registry[system]!!.spawn(behavior, name) } override fun compareTo(other: ActorRef<*>): Int = path.compareTo(other.path) @@ -140,22 +167,28 @@ class OmegaActorSystem(root: Behavior, override val name: String) * @param initialBehavior The initial behavior of this actor. */ private inner class Actor(override val self: ActorRef, initialBehavior: Behavior) : ActorContext { - val children: MutableSet> = mutableSetOf() + val childActors: MutableMap> = mutableMapOf() val interpreter = BehaviorInterpreter(initialBehavior) val watchers: MutableSet = Collections.newSetFromMap(WeakHashMap()) override val time: Instant get() = this@OmegaActorSystem.time + override val children: List> + get() = childActors.values.map { it.self } + override val system: ActorSystem<*> get() = this@OmegaActorSystem override val log: Logger by lazy(LazyThreadSafetyMode.NONE) { LoggerImpl(this) } + override fun getChild(name: String): ActorRef<*>? = childActors[name]?.self + override fun send(ref: ActorRef, msg: U, after: Duration) = schedule(ref, msg, after) override fun spawn(behavior: Behavior, name: String): ActorRef { - require(!name.startsWith("$")) { "Name may not start with $-sign" } + require(name.isNotEmpty()) { "Actor name may not be empty" } + require(!name.startsWith("$")) { "Actor name may not start with $-sign" } return internalSpawn(behavior, name) } @@ -165,25 +198,34 @@ class OmegaActorSystem(root: Behavior, override val name: String) } private fun internalSpawn(behavior: Behavior, name: String): ActorRef { + require(name !in childActors) { "Actor name $name not unique" } val ref = ActorRefImpl(this@OmegaActorSystem, self.path.child(name)) - if (ref.path !in registry) { - val actor = Actor(ref, behavior) - registry[ref.path] = actor - children += actor - schedule(ref, PreStart, .0) - actor.start() - } + val actor = Actor(ref, behavior) + registry[ref.path] = actor + childActors[name] = actor + schedule(ref, PreStart, .0) + actor.start() return ref } - override fun stop(child: ActorRef<*>): Boolean { - if (child.path.parent != self.path) { - // This is not a child of this actor - return false + override fun stop(child: ActorRef<*>) { + when { + // Must be a direct child of this actor + child.path.parent == self.path -> { + val ref = childActors[child.path.name] ?: return + ref.stop(null) + } + self == child -> throw IllegalArgumentException( + "Only direct children of an actor may be stopped through the actor context, " + + "but you tried to stop [$self] by passing its ActorRef to the `stop` method. " + + "Stopping self has to be expressed as explicitly returning a Stop Behavior." + ) + else -> throw IllegalArgumentException( + "Only direct children of an actor may be stopped through the actor context, " + + "but [$child] is not a child of [$self]. Stopping other actors has to be expressed as " + + "an explicit stop message that the actor accepts." + ) } - val ref = registry[child.path] ?: return false - ref.stop(null) - return true } override fun watch(target: ActorRef<*>) { @@ -214,11 +256,11 @@ class OmegaActorSystem(root: Behavior, override val name: String) */ fun stop(failure: Throwable?) { interpreter.stop(this) - children.forEach { it.stop(failure) } - val termination = Terminated(self, failure) - watchers.forEach { schedule(it, termination, 0.0) } + childActors.values.forEach { it.stop(failure) } registry.remove(self.path) interpreter.interpretSignal(this, PostStop) + val termination = Terminated(self, failure) + watchers.forEach { schedule(it, termination, 0.0) } } /** -- cgit v1.2.3