summaryrefslogtreecommitdiff
path: root/odcsim-engine-omega/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2019-05-09 17:54:02 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2019-05-14 12:55:56 +0200
commit2f6dcaef25d80a1411512e482953c83990149fd1 (patch)
tree7832040f6c99c83140d99b732f6583298e167ef6 /odcsim-engine-omega/src
parent1f77d1011577c54e98ad0cbbd898817f98000881 (diff)
refactor: Make actor spawning and stopping behavior consistent
This change makes the behavior for spawning and stopping a child actor more consistent and better specified in the documentation.
Diffstat (limited to 'odcsim-engine-omega/src')
-rw-r--r--odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt112
1 files changed, 77 insertions, 35 deletions
diff --git a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
index 37b5395f..ec088b98 100644
--- a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
+++ b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
@@ -36,8 +36,11 @@ import com.atlarge.odcsim.PostStop
import com.atlarge.odcsim.PreStart
import com.atlarge.odcsim.Signal
import com.atlarge.odcsim.Terminated
+import com.atlarge.odcsim.empty
import com.atlarge.odcsim.internal.BehaviorInterpreter
import com.atlarge.odcsim.internal.logging.LoggerImpl
+import com.sun.org.apache.xalan.internal.lib.ExsltDatetime.time
+import com.sun.xml.internal.messaging.saaj.soap.impl.EnvelopeImpl
import org.slf4j.Logger
import java.util.Collections
import java.util.PriorityQueue
@@ -51,54 +54,73 @@ import kotlin.math.max
* This engine implementation is a single-threaded implementation, running actors synchronously and
* provides a single priority queue for all events (messages, ticks, etc) that occur.
*
- * @param root The behavior of the root actor.
+ * @param guardianBehavior The behavior of the guardian (root) actor.
* @param name The name of the engine instance.
*/
-class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) : ActorSystem<T>, ActorRef<T> {
+class OmegaActorSystem<in T : Any>(guardianBehavior: Behavior<T>, override val name: String) : ActorSystem<T>, ActorRef<T> {
/**
- * The current point in simulation time.
+ * The state of the actor system.
*/
- override var time: Instant = .0
+ private var state: ActorSystemState = ActorSystemState.CREATED
/**
- * The path to the root actor.
+ * The event queue to process
*/
- override val path: ActorPath = ActorPath.Root(name = "/user")
+ private val queue: PriorityQueue<EnvelopeImpl> = PriorityQueue(
+ Comparator
+ .comparingDouble(EnvelopeImpl::time)
+ .thenComparingLong(EnvelopeImpl::id)
+ )
/**
- * The state of the actor system.
+ * The registry of actors in the system.
*/
- private var state: ActorSystemState = ActorSystemState.CREATED
+ private val registry: MutableMap<ActorPath, Actor<*>> = HashMap()
/**
- * The event queue to process
+ * The root actor path of the system.
*/
- private val queue: PriorityQueue<EnvelopeImpl> = PriorityQueue(Comparator
- .comparingDouble(EnvelopeImpl::time)
- .thenComparingLong(EnvelopeImpl::id))
+ private val root: ActorPath = ActorPath.Root()
/**
- * The registry of actors in the system.
+ * The system actor path.
*/
- private val registry: MutableMap<ActorPath, Actor<*>> = HashMap()
+ private val system: ActorPath = root / "system"
+
+ /**
+ * The current point in simulation time.
+ */
+ override var time: Instant = .0
+
+ /**
+ * The path to the root actor.
+ */
+ override val path: ActorPath = root / "user"
init {
- registry[path] = Actor(this, root)
+ registry[system] = Actor(ActorRefImpl(this, system), empty<Nothing>())
+ registry[path] = Actor(this, guardianBehavior)
schedule(this, PreStart, .0)
}
override fun run(until: Duration) {
require(until >= .0) { "The given instant must be a non-negative number" }
- // Start the root actor on initial run
+ // Start the system/guardian actor on initial run
if (state == ActorSystemState.CREATED) {
state = ActorSystemState.STARTED
+ registry[system]!!.isolate { it.start() }
registry[path]!!.isolate { it.start() }
} else if (state == ActorSystemState.TERMINATED) {
throw IllegalStateException("The ActorSystem has been terminated.")
}
while (time < until) {
+ // Check whether the system was interrupted
+ if (Thread.interrupted()) {
+ throw InterruptedException()
+ }
+
val envelope = queue.peek() ?: break
val delivery = envelope.time.takeUnless { it > until } ?: break
@@ -124,6 +146,11 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
override fun terminate() {
registry[path]?.stop(null)
+ registry[system]?.stop(null)
+ }
+
+ override suspend fun <U : Any> spawnSystem(behavior: Behavior<U>, name: String): ActorRef<U> {
+ return registry[system]!!.spawn(behavior, name)
}
override fun compareTo(other: ActorRef<*>): Int = path.compareTo(other.path)
@@ -140,22 +167,28 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
* @param initialBehavior The initial behavior of this actor.
*/
private inner class Actor<T : Any>(override val self: ActorRef<T>, initialBehavior: Behavior<T>) : ActorContext<T> {
- val children: MutableSet<Actor<*>> = mutableSetOf()
+ val childActors: MutableMap<String, Actor<*>> = mutableMapOf()
val interpreter = BehaviorInterpreter(initialBehavior)
val watchers: MutableSet<ActorPath> = Collections.newSetFromMap(WeakHashMap<ActorPath, Boolean>())
override val time: Instant
get() = this@OmegaActorSystem.time
+ override val children: List<ActorRef<*>>
+ get() = childActors.values.map { it.self }
+
override val system: ActorSystem<*>
get() = this@OmegaActorSystem
override val log: Logger by lazy(LazyThreadSafetyMode.NONE) { LoggerImpl(this) }
+ override fun getChild(name: String): ActorRef<*>? = childActors[name]?.self
+
override fun <U : Any> send(ref: ActorRef<U>, msg: U, after: Duration) = schedule(ref, msg, after)
override fun <U : Any> spawn(behavior: Behavior<U>, name: String): ActorRef<U> {
- require(!name.startsWith("$")) { "Name may not start with $-sign" }
+ require(name.isNotEmpty()) { "Actor name may not be empty" }
+ require(!name.startsWith("$")) { "Actor name may not start with $-sign" }
return internalSpawn(behavior, name)
}
@@ -165,25 +198,34 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
}
private fun <U : Any> internalSpawn(behavior: Behavior<U>, name: String): ActorRef<U> {
+ require(name !in childActors) { "Actor name $name not unique" }
val ref = ActorRefImpl<U>(this@OmegaActorSystem, self.path.child(name))
- if (ref.path !in registry) {
- val actor = Actor(ref, behavior)
- registry[ref.path] = actor
- children += actor
- schedule(ref, PreStart, .0)
- actor.start()
- }
+ val actor = Actor(ref, behavior)
+ registry[ref.path] = actor
+ childActors[name] = actor
+ schedule(ref, PreStart, .0)
+ actor.start()
return ref
}
- override fun stop(child: ActorRef<*>): Boolean {
- if (child.path.parent != self.path) {
- // This is not a child of this actor
- return false
+ override fun stop(child: ActorRef<*>) {
+ when {
+ // Must be a direct child of this actor
+ child.path.parent == self.path -> {
+ val ref = childActors[child.path.name] ?: return
+ ref.stop(null)
+ }
+ self == child -> throw IllegalArgumentException(
+ "Only direct children of an actor may be stopped through the actor context, " +
+ "but you tried to stop [$self] by passing its ActorRef to the `stop` method. " +
+ "Stopping self has to be expressed as explicitly returning a Stop Behavior."
+ )
+ else -> throw IllegalArgumentException(
+ "Only direct children of an actor may be stopped through the actor context, " +
+ "but [$child] is not a child of [$self]. Stopping other actors has to be expressed as " +
+ "an explicit stop message that the actor accepts."
+ )
}
- val ref = registry[child.path] ?: return false
- ref.stop(null)
- return true
}
override fun watch(target: ActorRef<*>) {
@@ -214,11 +256,11 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
*/
fun stop(failure: Throwable?) {
interpreter.stop(this)
- children.forEach { it.stop(failure) }
- val termination = Terminated(self, failure)
- watchers.forEach { schedule(it, termination, 0.0) }
+ childActors.values.forEach { it.stop(failure) }
registry.remove(self.path)
interpreter.interpretSignal(this, PostStop)
+ val termination = Terminated(self, failure)
+ watchers.forEach { schedule(it, termination, 0.0) }
}
/**