summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt9
-rw-r--r--odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt41
-rw-r--r--odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemContract.kt33
-rw-r--r--odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemFactoryContract.kt14
4 files changed, 70 insertions, 27 deletions
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt
index cbb80541..bae1fb74 100644
--- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/ActorSystem.kt
@@ -57,4 +57,13 @@ interface ActorSystem<in T : Any> : ActorRef<T> {
* @param after The delay after which the message should be received by the actor.
*/
fun send(msg: T, after: Duration = 0.1)
+
+ /**
+ * Terminates this actor system.
+ *
+ * This will stop the root actor and in turn will recursively stop all its child actors.
+ *
+ * This is an asynchronous operation.
+ */
+ fun terminate()
}
diff --git a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
index 5a886a6d..566f1ff5 100644
--- a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
+++ b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
@@ -62,7 +62,7 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
/**
* A flag to indicate the system has started.
*/
- private var isStarted: Boolean = false
+ private var state: ActorSystemState = ActorSystemState.PENDING
/**
* The event queue to process
@@ -90,9 +90,11 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
require(until >= .0) { "The given instant must be a non-negative number" }
// Start the root actor on initial run
- if (!isStarted) {
- registry[path]!!.start()
- isStarted = true
+ if (state == ActorSystemState.PENDING) {
+ registry[path]!!.isolate { it.start() }
+ state = ActorSystemState.STARTED
+ } else if (state == ActorSystemState.TERMINATED) {
+ throw IllegalStateException("The ActorSystem has been terminated.")
}
while (time < until) {
@@ -107,15 +109,8 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
queue.poll()
val actor = registry[envelope.destination] ?: continue
- try {
- // Notice that messages for unknown/terminated actors are ignored for now
- actor.interpretMessage(envelope.message)
- } catch (e: Exception) {
- // Forcefully stop the actor if it crashed
- actor.stop()
-
- logger.error(e) { "Unhandled exception in actor ${envelope.destination}" }
- }
+ // Notice that messages for unknown/terminated actors are ignored for now
+ actor.isolate { it.interpretMessage(envelope.message) }
}
// Jump forward in time as the caller expects the system to have run until the specified instant
@@ -125,6 +120,10 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
override fun send(msg: T, after: Duration) = schedule(this, msg, after)
+ override fun terminate() {
+ registry[path]?.stop()
+ }
+
/**
* The identifier for the next message to be scheduled.
*/
@@ -222,6 +221,22 @@ class OmegaActorSystem<in T : Any>(root: Behavior<T>, override val name: String)
override fun hashCode(): Int = self.path.hashCode()
}
+ private inline fun <T : Any, U> Actor<T>.isolate(block: (Actor<T>) -> U): U? {
+ return try {
+ block(this)
+ } catch (e: Exception) {
+ // Forcefully stop the actor if it crashed
+ stop()
+ logger.error(e) { "Unhandled exception in actor $path" }
+ null
+ }
+ }
+
+
+ private enum class ActorSystemState {
+ PENDING, STARTED, TERMINATED
+ }
+
private inner class ActorRefImpl<T : Any>(override val path: ActorPath) : ActorRef<T>
/**
diff --git a/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemContract.kt b/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemContract.kt
index eb6d38f6..bdcfad55 100644
--- a/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemContract.kt
+++ b/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemContract.kt
@@ -60,6 +60,7 @@ abstract class ActorSystemContract {
val system = factory(empty<Unit>(), name)
assertEquals(name, system.name)
+ system.terminate()
}
/**
@@ -70,6 +71,7 @@ abstract class ActorSystemContract {
val system = factory(empty<Unit>(), "test")
assertTrue(system.path is ActorPath.Root)
+ system.terminate()
}
/**
@@ -80,6 +82,7 @@ abstract class ActorSystemContract {
val system = factory(empty<Unit>(), name = "test")
assertEquals(.0, system.time, DELTA)
+ system.terminate()
}
/**
@@ -89,6 +92,7 @@ abstract class ActorSystemContract {
fun `should not accept negative instants for running`() {
val system = factory(empty<Unit>(), name = "test")
assertThrows<IllegalArgumentException> { system.run(-10.0) }
+ system.terminate()
}
/**
@@ -103,6 +107,7 @@ abstract class ActorSystemContract {
system.run(until = until)
system.run(until = until - 0.5)
assertEquals(until, system.time, DELTA)
+ system.terminate()
}
/**
@@ -115,6 +120,7 @@ abstract class ActorSystemContract {
system.run(until = until)
assertEquals(until, system.time, DELTA)
+ system.terminate()
}
/**
@@ -133,6 +139,7 @@ abstract class ActorSystemContract {
system.send(1, after = 1.0)
system.send(2, after = 1.0)
system.run(until = 10.0)
+ system.terminate()
}
/**
@@ -150,6 +157,7 @@ abstract class ActorSystemContract {
system.send(Unit, after = 1.0)
system.run(until = 2.0)
assertEquals(1, counter)
+ system.terminate()
}
/**
@@ -157,7 +165,8 @@ abstract class ActorSystemContract {
*/
@Test
fun `should not initialize root actor if not run`() {
- factory(setup<Unit> { TODO() }, name = "test")
+ val system = factory(setup<Unit> { TODO() }, name = "test")
+ system.terminate()
}
@@ -171,6 +180,7 @@ abstract class ActorSystemContract {
fun `should disallow messages in the past`() {
val system = factory(empty<Unit>(), name = "test")
assertThrows<IllegalArgumentException> { system.send(Unit, after = -1.0) }
+ system.terminate()
}
}
@@ -189,6 +199,7 @@ abstract class ActorSystemContract {
val system = factory(behavior, "test")
system.run()
+ system.terminate()
}
/**
@@ -207,6 +218,7 @@ abstract class ActorSystemContract {
system.run(until = 10.0)
assertTrue(spawned)
+ system.terminate()
}
/**
@@ -223,6 +235,7 @@ abstract class ActorSystemContract {
}, name = "test")
system.run(until = 10.0)
+ system.terminate()
}
/**
@@ -240,6 +253,7 @@ abstract class ActorSystemContract {
ignore()
}, name = "test")
system.run()
+ system.terminate()
}
/**
@@ -254,6 +268,7 @@ abstract class ActorSystemContract {
ignore()
}, name = "test")
system.run()
+ system.terminate()
}
/**
@@ -279,6 +294,7 @@ abstract class ActorSystemContract {
}, name = "test")
system.run()
+ system.terminate()
}
/**
@@ -300,6 +316,7 @@ abstract class ActorSystemContract {
val system = factory(behavior, "test")
system.run()
assertEquals(2, counter)
+ system.terminate()
}
/**
@@ -316,24 +333,17 @@ abstract class ActorSystemContract {
val system = factory(behavior, "test")
system.run()
assertTrue(flag)
+ system.terminate()
}
/**
- * Test whether we cannot start an actor with the [Behavior.Companion.same] behavior.
- */
- @Test
- fun `should not start with same behavior`() {
- val system = factory(same<Unit>(), "test")
- assertThrows<IllegalArgumentException> { system.run() }
- }
-
- /**
- * Test whether we can start an actor with the [Behavior.Companion.stopped] behavior.
+ * Test whether we can start an actor with the [stopped] behavior.
*/
@Test
fun `should start with stopped behavior`() {
val system = factory(stopped<Unit>(), "test")
system.run()
+ system.terminate()
}
@@ -353,6 +363,7 @@ abstract class ActorSystemContract {
system.run()
assertEquals(1, counter)
+ system.terminate()
}
}
diff --git a/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemFactoryContract.kt b/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemFactoryContract.kt
index 55d70a84..565f4f4c 100644
--- a/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemFactoryContract.kt
+++ b/odcsim-engine-tests/src/main/kotlin/com/atlarge/odcsim/engine/tests/ActorSystemFactoryContract.kt
@@ -27,9 +27,10 @@ package com.atlarge.odcsim.engine.tests
import com.atlarge.odcsim.ActorSystemFactory
import com.atlarge.odcsim.empty
import com.atlarge.odcsim.setup
+import com.atlarge.odcsim.stopped
import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertThrows
/**
* A conformance test suite for implementors of the [ActorSystemFactory] interface.
@@ -50,6 +51,7 @@ abstract class ActorSystemFactoryContract {
val system = factory(empty<Unit>(), name)
assertEquals(name, system.name)
+ system.terminate()
}
/**
@@ -57,9 +59,15 @@ abstract class ActorSystemFactoryContract {
*/
@Test
fun `should create a system with correct root behavior`() {
+ var flag = false
val factory = createFactory()
- val system = factory(setup<Unit> { throw UnsupportedOperationException() }, "test")
+ val system = factory(setup<Unit> {
+ flag = true
+ stopped()
+ }, "test")
- assertThrows<UnsupportedOperationException> { system.run(until = 10.0) }
+ system.run(until = 10.0)
+ system.terminate()
+ assertTrue(flag)
}
}