diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2019-04-25 21:28:10 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2019-05-13 20:26:47 +0200 |
| commit | 4e6920d5c128b49750408a11850dfa6a7abb1e9e (patch) | |
| tree | cc1e718ee8a04a94c3ec7623e977fb681d47dbcf | |
| parent | 38828ab3708a22bbd321c76d936648a2010f5c60 (diff) | |
feat: Add support for ActorSystem termination
This change adds support for terminating an ActorSystem.
4 files changed, 70 insertions, 27 deletions
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt index cbb80541..bae1fb74 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt @@ -57,4 +57,13 @@ interface ActorSystem<in T : Any> : ActorRef<T> { * @param after The delay after which the message should be received by the actor. */ fun send(msg: T, after: Duration = 0.1) + + /** + * Terminates this actor system. + * + * This will stop the root actor and in turn will recursively stop all its child actors. + * + * This is an asynchronous operation. + */ + fun terminate() } diff --git a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt index 5a886a6d..566f1ff5 100644 --- a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt +++ b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt @@ -62,7 +62,7 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) /** * A flag to indicate the system has started. */ - private var isStarted: Boolean = false + private var state: ActorSystemState = ActorSystemState.PENDING /** * The event queue to process @@ -90,9 +90,11 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) require(until >= .0) { "The given instant must be a non-negative number" } // Start the root actor on initial run - if (!isStarted) { - registry[path]!!.start() - isStarted = true + if (state == ActorSystemState.PENDING) { + registry[path]!!.isolate { it.start() } + state = ActorSystemState.STARTED + } else if (state == ActorSystemState.TERMINATED) { + throw IllegalStateException("The ActorSystem has been terminated.") } while (time < until) { @@ -107,15 +109,8 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) queue.poll() val actor = registry[envelope.destination] ?: continue - try { - // Notice that messages for unknown/terminated actors are ignored for now - actor.interpretMessage(envelope.message) - } catch (e: Exception) { - // Forcefully stop the actor if it crashed - actor.stop() - - logger.error(e) { "Unhandled exception in actor ${envelope.destination}" } - } + // Notice that messages for unknown/terminated actors are ignored for now + actor.isolate { it.interpretMessage(envelope.message) } } // Jump forward in time as the caller expects the system to have run until the specified instant @@ -125,6 +120,10 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) override fun send(msg: T, after: Duration) = schedule(this, msg, after) + override fun terminate() { + registry[path]?.stop() + } + /** * The identifier for the next message to be scheduled. */ @@ -222,6 +221,22 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String) override fun hashCode(): Int = self.path.hashCode() } + private inline fun <T : Any, U> Actor<T>.isolate(block: (Actor<T>) -> U): U? { + return try { + block(this) + } catch (e: Exception) { + // Forcefully stop the actor if it crashed + stop() + logger.error(e) { "Unhandled exception in actor $path" } + null + } + } + + + private enum class ActorSystemState { + PENDING, STARTED, TERMINATED + } + private inner class ActorRefImpl<T : Any>(override val path: ActorPath) : ActorRef<T> /** diff --git a/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemContract.kt b/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemContract.kt index eb6d38f6..bdcfad55 100644 --- a/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemContract.kt +++ b/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemContract.kt @@ -60,6 +60,7 @@ abstract class ActorSystemContract { val system = factory(empty<Unit>(), name) assertEquals(name, system.name) + system.terminate() } /** @@ -70,6 +71,7 @@ abstract class ActorSystemContract { val system = factory(empty<Unit>(), "test") assertTrue(system.path is ActorPath.Root) + system.terminate() } /** @@ -80,6 +82,7 @@ abstract class ActorSystemContract { val system = factory(empty<Unit>(), name = "test") assertEquals(.0, system.time, DELTA) + system.terminate() } /** @@ -89,6 +92,7 @@ abstract class ActorSystemContract { fun `should not accept negative instants for running`() { val system = factory(empty<Unit>(), name = "test") assertThrows<IllegalArgumentException> { system.run(-10.0) } + system.terminate() } /** @@ -103,6 +107,7 @@ abstract class ActorSystemContract { system.run(until = until) system.run(until = until - 0.5) assertEquals(until, system.time, DELTA) + system.terminate() } /** @@ -115,6 +120,7 @@ abstract class ActorSystemContract { system.run(until = until) assertEquals(until, system.time, DELTA) + system.terminate() } /** @@ -133,6 +139,7 @@ abstract class ActorSystemContract { system.send(1, after = 1.0) system.send(2, after = 1.0) system.run(until = 10.0) + system.terminate() } /** @@ -150,6 +157,7 @@ abstract class ActorSystemContract { system.send(Unit, after = 1.0) system.run(until = 2.0) assertEquals(1, counter) + system.terminate() } /** @@ -157,7 +165,8 @@ abstract class ActorSystemContract { */ @Test fun `should not initialize root actor if not run`() { - factory(setup<Unit> { TODO() }, name = "test") + val system = factory(setup<Unit> { TODO() }, name = "test") + system.terminate() } @@ -171,6 +180,7 @@ abstract class ActorSystemContract { fun `should disallow messages in the past`() { val system = factory(empty<Unit>(), name = "test") assertThrows<IllegalArgumentException> { system.send(Unit, after = -1.0) } + system.terminate() } } @@ -189,6 +199,7 @@ abstract class ActorSystemContract { val system = factory(behavior, "test") system.run() + system.terminate() } /** @@ -207,6 +218,7 @@ abstract class ActorSystemContract { system.run(until = 10.0) assertTrue(spawned) + system.terminate() } /** @@ -223,6 +235,7 @@ abstract class ActorSystemContract { }, name = "test") system.run(until = 10.0) + system.terminate() } /** @@ -240,6 +253,7 @@ abstract class ActorSystemContract { ignore() }, name = "test") system.run() + system.terminate() } /** @@ -254,6 +268,7 @@ abstract class ActorSystemContract { ignore() }, name = "test") system.run() + system.terminate() } /** @@ -279,6 +294,7 @@ abstract class ActorSystemContract { }, name = "test") system.run() + system.terminate() } /** @@ -300,6 +316,7 @@ abstract class ActorSystemContract { val system = factory(behavior, "test") system.run() assertEquals(2, counter) + system.terminate() } /** @@ -316,24 +333,17 @@ abstract class ActorSystemContract { val system = factory(behavior, "test") system.run() assertTrue(flag) + system.terminate() } /** - * Test whether we cannot start an actor with the [Behavior.Companion.same] behavior. - */ - @Test - fun `should not start with same behavior`() { - val system = factory(same<Unit>(), "test") - assertThrows<IllegalArgumentException> { system.run() } - } - - /** - * Test whether we can start an actor with the [Behavior.Companion.stopped] behavior. + * Test whether we can start an actor with the [stopped] behavior. */ @Test fun `should start with stopped behavior`() { val system = factory(stopped<Unit>(), "test") system.run() + system.terminate() } @@ -353,6 +363,7 @@ abstract class ActorSystemContract { system.run() assertEquals(1, counter) + system.terminate() } } diff --git a/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemFactoryContract.kt b/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemFactoryContract.kt index 55d70a84..565f4f4c 100644 --- a/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemFactoryContract.kt +++ b/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemFactoryContract.kt @@ -27,9 +27,10 @@ package com.atlarge.odcsim.engine.tests import com.atlarge.odcsim.ActorSystemFactory import com.atlarge.odcsim.empty import com.atlarge.odcsim.setup +import com.atlarge.odcsim.stopped import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test -import org.junit.jupiter.api.assertThrows /** * A conformance test suite for implementors of the [ActorSystemFactory] interface. @@ -50,6 +51,7 @@ abstract class ActorSystemFactoryContract { val system = factory(empty<Unit>(), name) assertEquals(name, system.name) + system.terminate() } /** @@ -57,9 +59,15 @@ abstract class ActorSystemFactoryContract { */ @Test fun `should create a system with correct root behavior`() { + var flag = false val factory = createFactory() - val system = factory(setup<Unit> { throw UnsupportedOperationException() }, "test") + val system = factory(setup<Unit> { + flag = true + stopped() + }, "test") - assertThrows<UnsupportedOperationException> { system.run(until = 10.0) } + system.run(until = 10.0) + system.terminate() + assertTrue(flag) } } |
