summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2019-05-06 18:21:13 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2019-05-14 12:55:56 +0200
commit8a4972730965506cee3cbbdeb2b735454d4b6500 (patch)
tree86c5373c07faef732a41401e7cea09d267b3c6d6
parent5b48cdbad2493c7af9e79bb9996f195ace3123e5 (diff)
feat: Add support for watching actor termination
This change adds support for tracking termination of actors in the system.
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt17
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt2
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt10
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt3
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt7
-rw-r--r--odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt56
-rw-r--r--odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemContract.kt36
-rw-r--r--odcsim-testkit/src/main/kotlin/com/atlarge/odcsim/testkit/internal/ActorContextStub.kt4
8 files changed, 111 insertions, 24 deletions
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt
index 3a704f32..8ea4a09d 100644
--- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorContext.kt
@@ -84,6 +84,23 @@ interface ActorContext<T : Any> {
fun stop(child: ActorRef<*>): Boolean
/**
+ * Watch the specified [ActorRef] for termination of the referenced actor. On termination of the watched actor,
+ * a [Terminated] signal is sent to this actor.
+ *
+ * @param target The target actor to watch.
+ */
+ fun watch(target: ActorRef<*>)
+
+ /**
+ * Revoke the registration established by [watch].
+ *
+ * In case there exists no registration for the specified [target], no action will be performed.
+ *
+ * @param target The target actor to unwatch.
+ */
+ fun unwatch(target: ActorRef<*>)
+
+ /**
* Synchronize the local virtual time of this target with the other referenced actor's local virtual time.
*
* By default, actors are not guaranteed to be synchronized, meaning that for some implementations, virtual time may
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt
index 8298ed28..9ad7f83f 100644
--- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Behavior.kt
@@ -167,7 +167,7 @@ val <T : Any> Behavior<T>.isAlive get() = this !is StoppedBehavior
/**
* A flag to indicate whether the last message/signal went unhandled.
*/
-val <T : Any> Behavior<T>.isUnhandled get() = this !is UnhandledBehavior
+val <T : Any> Behavior<T>.isUnhandled get() = this is UnhandledBehavior
// The special behaviors are kept in this file as to be able to seal the Behavior class to prevent users from extending
// it.
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt
index 35aad18b..e393c6a1 100644
--- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/Signals.kt
@@ -48,4 +48,12 @@ object PostStop : Signal
*
* @property target The target object that has timed out.
*/
-class Timeout(val target: Any) : Signal
+data class Timeout(val target: Any) : Signal
+
+/**
+ * A lifecycle signal to indicate that an actor that was watched has terminated.
+ *
+ * @property ref The reference to the actor that has terminated.
+ * @property failure The failure that caused the termination, or `null` on graceful termination.
+ */
+data class Terminated(val ref: ActorRef<*>, val failure: Throwable? = null) : Signal
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt
index 1d91f5b8..694a107c 100644
--- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/coroutines/dsl/Timeout.kt
@@ -28,6 +28,7 @@ import com.atlarge.odcsim.Duration
import com.atlarge.odcsim.Timeout
import com.atlarge.odcsim.coroutines.SuspendingActorContext
import com.atlarge.odcsim.coroutines.suspendWithBehavior
+import com.atlarge.odcsim.internal.sendSignal
import com.atlarge.odcsim.receiveSignal
import com.atlarge.odcsim.setup
import com.atlarge.odcsim.unhandled
@@ -43,7 +44,7 @@ suspend fun <T : Any> SuspendingActorContext<T>.timeout(after: Duration) =
setup { ctx ->
val target = this
@Suppress("UNCHECKED_CAST")
- ctx.send(ctx.self, Timeout(target) as T, after)
+ ctx.sendSignal(ctx.self, Timeout(target), after)
receiveSignal { _, signal ->
if (signal is Timeout && signal.target == target) {
cont.resume(Unit)
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt
index 8d533e9e..a9e20d0e 100644
--- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/Coroutines.kt
@@ -68,7 +68,8 @@ interface SuspendingActorContextImpl<T : Any> : SuspendingActorContext<T> {
*/
internal class SuspendingBehaviorImpl<T : Any>(
private var actorContext: ActorContext<T>,
- initialBehavior: SuspendingBehavior<T>) : ReceivingBehavior<T>(), SuspendingActorContextImpl<T> {
+ initialBehavior: SuspendingBehavior<T>
+) : ReceivingBehavior<T>(), SuspendingActorContextImpl<T> {
/**
* The next behavior to use.
@@ -106,6 +107,10 @@ internal class SuspendingBehaviorImpl<T : Any>(
override fun stop(child: ActorRef<*>): Boolean = actorContext.stop(child)
+ override fun watch(target: ActorRef<*>) = actorContext.watch(target)
+
+ override fun unwatch(target: ActorRef<*>) = actorContext.unwatch(target)
+
override fun sync(target: ActorRef<*>) = actorContext.sync(target)
override fun unsync(target: ActorRef<*>) = actorContext.unsync(target)
diff --git a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
index c1d487b7..8930fb96 100644
--- a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
+++ b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
@@ -35,10 +35,14 @@ import com.atlarge.odcsim.Instant
import com.atlarge.odcsim.PostStop
import com.atlarge.odcsim.PreStart
import com.atlarge.odcsim.Signal
+import com.atlarge.odcsim.Terminated
import com.atlarge.odcsim.internal.BehaviorInterpreter
import com.atlarge.odcsim.internal.logging.LoggerImpl
+import com.sun.xml.internal.messaging.saaj.soap.impl.EnvelopeImpl
import org.slf4j.Logger
+import java.util.Collections
import java.util.PriorityQueue
+import java.util.WeakHashMap
import kotlin.math.max
/**
@@ -119,7 +123,7 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
override fun send(msg: T, after: Duration) = schedule(this, msg, after)
override fun terminate() {
- registry[path]?.stop()
+ registry[path]?.stop(null)
}
override fun compareTo(other: ActorRef<*>): Int = path.compareTo(other.path)
@@ -138,6 +142,7 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
private inner class Actor<T : Any>(override val self: ActorRef<T>, initialBehavior: Behavior<T>) : ActorContext<T> {
val children: MutableSet<Actor<*>> = mutableSetOf()
val interpreter = BehaviorInterpreter(initialBehavior)
+ val watchers: MutableSet<ActorPath> = Collections.newSetFromMap(WeakHashMap<ActorPath, Boolean>())
override val time: Instant
get() = this@OmegaActorSystem.time
@@ -167,10 +172,18 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
return false
}
val ref = registry[child.path] ?: return false
- ref.stop()
+ ref.stop(null)
return true
}
+ override fun watch(target: ActorRef<*>) {
+ registry[target.path]?.watchers?.add(path)
+ }
+
+ override fun unwatch(target: ActorRef<*>) {
+ registry[target.path]?.watchers?.remove(path)
+ }
+
// Synchronization of actors in a single-threaded simulation is trivial: all actors are consistent in virtual
// time.
override fun sync(target: ActorRef<*>) {}
@@ -189,18 +202,13 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
/**
* Stop this actor.
*/
- fun stop() {
+ fun stop(failure: Throwable?) {
interpreter.stop(this)
- terminate()
- interpreter.interpretSignal(this, PostStop)
- }
-
- /**
- * Terminate this actor and its children.
- */
- fun terminate() {
- children.forEach { it.stop() }
+ children.forEach { it.stop(failure) }
+ val termination = Terminated(self, failure)
+ watchers.forEach { schedule(it, termination, 0.0) }
registry.remove(self.path)
+ interpreter.interpretSignal(this, PostStop)
}
/**
@@ -215,8 +223,7 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
}
if (!interpreter.isAlive) {
- terminate()
- interpreter.interpretSignal(this, PostStop)
+ stop(null)
}
}
@@ -232,10 +239,10 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
private inline fun <T : Any, U> Actor<T>.isolate(block: (Actor<T>) -> U): U? {
return try {
block(this)
- } catch (e: Exception) {
+ } catch (t: Throwable) {
// Forcefully stop the actor if it crashed
- stop()
- log.error("Unhandled exception in actor $path", e)
+ stop(t)
+ log.error("Unhandled exception in actor $path", t)
null
}
}
@@ -277,12 +284,23 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
/**
* Schedule a message to be processed by the engine.
*
- * @property destination The destination of the message.
+ * @param destination The destination of the message.
* @param message The message to schedule.
* @param delay The time to wait before processing the message.
*/
private fun schedule(destination: ActorRef<*>, message: Any, delay: Duration) {
+ schedule(destination.path, message, delay)
+ }
+
+ /**
+ * Schedule a message to be processed by the engine.
+ *
+ * @param path The path to the destination of the message.
+ * @param message The message to schedule.
+ * @param delay The time to wait before processing the message.
+ */
+ private fun schedule(path: ActorPath, message: Any, delay: Duration) {
require(delay >= .0) { "The given delay must be a non-negative number" }
- queue.add(EnvelopeImpl(nextId++, destination.path, time + delay, message))
+ queue.add(EnvelopeImpl(nextId++, path, time + delay, message))
}
}
diff --git a/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemContract.kt b/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemContract.kt
index cb75c2ac..867e7c11 100644
--- a/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemContract.kt
+++ b/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemContract.kt
@@ -28,9 +28,13 @@ import com.atlarge.odcsim.ActorPath
import com.atlarge.odcsim.ActorRef
import com.atlarge.odcsim.ActorSystemFactory
import com.atlarge.odcsim.Behavior
+import com.atlarge.odcsim.Terminated
+import com.atlarge.odcsim.coroutines.dsl.timeout
+import com.atlarge.odcsim.coroutines.suspending
import com.atlarge.odcsim.empty
import com.atlarge.odcsim.ignore
import com.atlarge.odcsim.receiveMessage
+import com.atlarge.odcsim.receiveSignal
import com.atlarge.odcsim.same
import com.atlarge.odcsim.setup
import com.atlarge.odcsim.stopped
@@ -278,7 +282,7 @@ abstract class ActorSystemContract {
val system = factory(setup<ActorRef<Unit>> { ctx ->
val root = ctx.self
val child = ctx.spawn(setup<Unit> {
- val child = ctx.spawn(receiveMessage<Unit> {
+ val child = it.spawn(receiveMessage<Unit> {
throw IllegalStateException("DELIBERATE")
}, "child")
ctx.send(root, child)
@@ -363,6 +367,36 @@ abstract class ActorSystemContract {
assertEquals(1, counter)
system.terminate()
}
+
+ /**
+ * Test whether an actor can watch for termination.
+ */
+ @Test
+ fun `should watch for termination`() {
+ var received = false
+ val system = factory(setup<Nothing> { ctx ->
+ val child = ctx.spawn(suspending<Nothing> {
+ it.timeout(50.0)
+ stopped()
+ }, "child")
+ ctx.watch(child)
+
+ receiveSignal { _, signal ->
+ when (signal) {
+ is Terminated -> {
+ received = true
+ stopped()
+ }
+ else ->
+ same()
+ }
+ }
+ }, "test")
+
+ system.run()
+ system.terminate()
+ assertTrue(received)
+ }
}
companion object {
diff --git a/odcsim-testkit/src/main/kotlin/com/atlarge/odcsim/testkit/internal/ActorContextStub.kt b/odcsim-testkit/src/main/kotlin/com/atlarge/odcsim/testkit/internal/ActorContextStub.kt
index d11d47a3..047e4c70 100644
--- a/odcsim-testkit/src/main/kotlin/com/atlarge/odcsim/testkit/internal/ActorContextStub.kt
+++ b/odcsim-testkit/src/main/kotlin/com/atlarge/odcsim/testkit/internal/ActorContextStub.kt
@@ -81,6 +81,10 @@ internal class ActorContextStub<T : Any>(private val owner: BehaviorTestKitImpl<
return true
}
+ override fun watch(target: ActorRef<*>) {}
+
+ override fun unwatch(target: ActorRef<*>) {}
+
override fun sync(target: ActorRef<*>) {}
override fun unsync(target: ActorRef<*>) {}