summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2018-11-21 23:19:32 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2019-05-13 20:26:45 +0200
commitff49cd6c9079264e04d2efa85b03a024d0e00cca (patch)
treebc925038f730e784c7d8ed3c0cbe0e07178b0417
parente1530fee893b5e9ceebf5e07b490c1c82da2e687 (diff)
feat: Add consistent exception handling in actor systems
This change makes exception handling in actor systems consistent. Implementors should not propagate unhandled exceptions in actors to the (one of the) threads running the engine, but instead should stop the offending actor and continue running. Supervision of actors should be implemented within actor behavior instead. Since BehaviorInterpreter will propagate exceptions thrown by an actor, consumers of that API can use it to catch unhandled exceptions in an actor.
-rw-r--r--odcsim-core/build.gradle1
-rw-r--r--odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt73
-rw-r--r--odcsim-core/src/test/kotlin/com/atlarge/odcsim/BehaviorTest.kt78
-rw-r--r--odcsim-engine-omega/build.gradle2
-rw-r--r--odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt20
5 files changed, 137 insertions, 37 deletions
diff --git a/odcsim-core/build.gradle b/odcsim-core/build.gradle
index 3ae40a0d..e4e61296 100644
--- a/odcsim-core/build.gradle
+++ b/odcsim-core/build.gradle
@@ -37,6 +37,7 @@ dependencies {
testImplementation "org.junit.jupiter:junit-jupiter-api:$junit_jupiter_version"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junit_jupiter_version"
testImplementation "org.junit.platform:junit-platform-launcher:$junit_platform_version"
+ testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.0.0"
}
/* Create configuration for test suite used by implementors */
diff --git a/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt b/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt
index 492dc686..3385767b 100644
--- a/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt
+++ b/odcsim-core/src/test/kotlin/com/atlarge/odcsim/ActorSystemTest.kt
@@ -24,6 +24,7 @@
package com.atlarge.odcsim
+import com.atlarge.odcsim.Behavior.Companion.same
import com.atlarge.odcsim.dsl.empty
import com.atlarge.odcsim.dsl.ignore
import com.atlarge.odcsim.dsl.receive
@@ -134,6 +135,23 @@ abstract class ActorSystemTest {
}
/**
+ * Test whether an [ActorSystem] will not process messages in the queue after the deadline.
+ */
+ @Test
+ fun `should not process messages after deadline`() {
+ var counter = 0
+ val behavior = Behavior.receiveMessage<Unit> { _ ->
+ counter++
+ same()
+ }
+ val system = factory(behavior, name = "test")
+ system.send(Unit, after = 3.0)
+ system.send(Unit, after = 1.0)
+ system.run(until = 2.0)
+ assertEquals(1, counter)
+ }
+
+ /**
* Test whether an [ActorSystem] will not initialize the root actor if the system has not been run yet.
*/
@Test
@@ -176,7 +194,8 @@ abstract class ActorSystemTest {
*/
@Test
fun `should allow spawning of child actors`() {
- val behavior = Behavior.setup<Unit> { throw UnsupportedOperationException("b") }
+ var spawned = false
+ val behavior = Behavior.setup<Unit> { spawned = true; Behavior.empty() }
val system = factory(Behavior.setup<Unit> { ctx ->
val ref = ctx.spawn(behavior, "child")
@@ -184,7 +203,8 @@ abstract class ActorSystemTest {
Behavior.ignore()
}, name = "test")
- assertThrows<UnsupportedOperationException> { system.run(until = 10.0) }
+ system.run(until = 10.0)
+ assertTrue(spawned)
}
/**
@@ -287,13 +307,15 @@ abstract class ActorSystemTest {
*/
@Test
fun `should have reference to itself`() {
+ var flag = false
val behavior: Behavior<Unit> = Behavior.setup { ctx ->
ctx.self.send(Unit)
- Behavior.receiveMessage { throw UnsupportedOperationException() }
+ Behavior.receiveMessage { flag = true; same() }
}
val system = factory(behavior, "test")
- assertThrows<UnsupportedOperationException> { system.run() }
+ system.run()
+ assertTrue(flag)
}
/**
@@ -306,24 +328,6 @@ abstract class ActorSystemTest {
}
/**
- * Test whether we cannot start an actor with the [Behavior.Companion.unhandled] behavior.
- */
- @Test
- fun `should not start with unhandled behavior`() {
- val system = factory(Behavior.unhandled<Unit>(), "test")
- assertThrows<IllegalArgumentException> { system.run() }
- }
-
- /**
- * Test whether we cannot start an actor with deferred unhandled behavior.
- */
- @Test
- fun `should not start with deferred unhandled behavior`() {
- val system = factory(Behavior.setup<Unit> { Behavior.unhandled() }, "test")
- assertThrows<IllegalArgumentException> { system.run() }
- }
-
- /**
* Test whether we can start an actor with the [Behavior.Companion.stopped] behavior.
*/
@Test
@@ -332,22 +336,23 @@ abstract class ActorSystemTest {
system.run()
}
- /**
- * Test whether deferred behavior that returns [Behavior.Companion.same] fails.
- */
- @Test
- fun `should not allow setup to return same`() {
- val system = factory(Behavior.setup<Unit> { Behavior.same() }, "test")
- assertThrows<IllegalArgumentException> { system.run() }
- }
/**
- * Test whether deferred behavior that returns [Behavior.Companion.unhandled] fails.
+ * Test whether an actor that is crashed cannot receive more messages.
*/
@Test
- fun `should not allow setup to return unhandled`() {
- val system = factory(Behavior.setup<Unit> { Behavior.unhandled() }, "test")
- assertThrows<IllegalArgumentException> { system.run() }
+ fun `should stop if it crashes`() {
+ var counter = 0
+ val system = factory(Behavior.receiveMessage<Unit> {
+ counter++
+ throw IllegalArgumentException("STAGED")
+ }, "test")
+
+ system.send(Unit)
+ system.send(Unit)
+
+ system.run()
+ assertEquals(1, counter)
}
}
}
diff --git a/odcsim-core/src/test/kotlin/com/atlarge/odcsim/BehaviorTest.kt b/odcsim-core/src/test/kotlin/com/atlarge/odcsim/BehaviorTest.kt
new file mode 100644
index 00000000..6f97e428
--- /dev/null
+++ b/odcsim-core/src/test/kotlin/com/atlarge/odcsim/BehaviorTest.kt
@@ -0,0 +1,78 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2018 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.odcsim
+
+import com.atlarge.odcsim.dsl.setup
+import com.atlarge.odcsim.internal.BehaviorInterpreter
+import com.nhaarman.mockitokotlin2.mock
+import org.junit.jupiter.api.DisplayName
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+
+/**
+ * Test suite for [Behavior] and [BehaviorInterpreter].
+ */
+@DisplayName("Behavior")
+class BehaviorTest {
+ /**
+ * Test whether we cannot start an actor with the [Behavior.Companion.unhandled] behavior.
+ */
+ @Test
+ fun `should not start with unhandled behavior`() {
+ val ctx = mock<ActorContext<Unit>>()
+ val interpreter = BehaviorInterpreter(Behavior.unhandled<Unit>())
+ assertThrows<IllegalArgumentException> { interpreter.start(ctx) }
+ }
+
+ /**
+ * Test whether we cannot start an actor with deferred unhandled behavior.
+ */
+ @Test
+ fun `should not start with deferred unhandled behavior`() {
+ val ctx = mock<ActorContext<Unit>>()
+ val interpreter = BehaviorInterpreter(Behavior.setup<Unit> { Behavior.unhandled() })
+ assertThrows<IllegalArgumentException> { interpreter.start(ctx) }
+ }
+
+ /**
+ * Test whether deferred behavior that returns [Behavior.Companion.same] fails.
+ */
+ @Test
+ fun `should not allow setup to return same`() {
+ val ctx = mock<ActorContext<Unit>>()
+ val interpreter = BehaviorInterpreter(Behavior.setup<Unit> { Behavior.same() })
+ assertThrows<IllegalArgumentException> { interpreter.start(ctx) }
+ }
+
+ /**
+ * Test whether deferred behavior that returns [Behavior.Companion.unhandled] fails.
+ */
+ @Test
+ fun `should not allow setup to return unhandled`() {
+ val ctx = mock<ActorContext<Unit>>()
+ val interpreter = BehaviorInterpreter(Behavior.setup<Unit> { Behavior.unhandled() })
+ assertThrows<IllegalArgumentException> { interpreter.start(ctx) }
+ }
+}
diff --git a/odcsim-engine-omega/build.gradle b/odcsim-engine-omega/build.gradle
index b8399a4e..f60dbe89 100644
--- a/odcsim-engine-omega/build.gradle
+++ b/odcsim-engine-omega/build.gradle
@@ -35,9 +35,11 @@ dependencies {
api project(':odcsim-core')
implementation "org.jetbrains.kotlin:kotlin-stdlib"
+ implementation "io.github.microutils:kotlin-logging:1.6.20"
testCompile project(path: ':odcsim-core', configuration: 'tests')
testImplementation "org.junit.jupiter:junit-jupiter-api:$junit_jupiter_version"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junit_jupiter_version"
testImplementation "org.junit.platform:junit-platform-launcher:$junit_platform_version"
+ testRuntimeOnly "org.slf4j:slf4j-simple:1.7.25"
}
diff --git a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
index 56e3020f..c4a9b35f 100644
--- a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
+++ b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
@@ -35,6 +35,7 @@ import com.atlarge.odcsim.PostStop
import com.atlarge.odcsim.PreStart
import com.atlarge.odcsim.Signal
import com.atlarge.odcsim.internal.BehaviorInterpreter
+import mu.KotlinLogging
import java.util.PriorityQueue
import kotlin.math.max
@@ -75,6 +76,11 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
*/
private val registry: MutableMap<ActorPath, Actor<*>> = HashMap()
+ /**
+ * A [KotlinLogging] instance that writes logs to a SLF4J implementation.
+ */
+ private val logger = KotlinLogging.logger {}
+
init {
registry[path] = Actor(this, root)
schedule(this, PreStart, .0)
@@ -100,8 +106,16 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
time = delivery
queue.poll()
- // Notice that messages for unknown/terminated actors are ignored for now
- registry[envelope.destination]?.interpretMessage(envelope.message)
+ val actor = registry[envelope.destination] ?: continue
+ try {
+ // Notice that messages for unknown/terminated actors are ignored for now
+ actor.interpretMessage(envelope.message)
+ } catch (e: Exception) {
+ // Forcefully stop the actor if it crashed
+ actor.stop()
+
+ logger.error(e) { "Unhandled exception in actor ${envelope.destination}" }
+ }
}
// Jump forward in time as the caller expects the system to have run until the specified instant
@@ -171,7 +185,7 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
* Terminate this actor and its children.
*/
fun terminate() {
- children.forEach { it.terminate() }
+ children.forEach { it.stop() }
registry.remove(self.path)
}