summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/StashBufferImpl.kt1
-rw-r--r--odcsim-engine-omega/build.gradle.kts1
-rw-r--r--odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt68
3 files changed, 36 insertions, 34 deletions
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/StashBufferImpl.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/StashBufferImpl.kt
index 1d85a8e6..24c3a9d5 100644
--- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/StashBufferImpl.kt
+++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/StashBufferImpl.kt
@@ -71,5 +71,4 @@ internal class StashBufferImpl<T : Any>(private val capacity: Int) : StashBuffer
return interpreter.behavior
}
-
}
diff --git a/odcsim-engine-omega/build.gradle.kts b/odcsim-engine-omega/build.gradle.kts
index 7f092f05..57e68b2c 100644
--- a/odcsim-engine-omega/build.gradle.kts
+++ b/odcsim-engine-omega/build.gradle.kts
@@ -40,6 +40,7 @@ dependencies {
api(project(":odcsim-core"))
implementation(kotlin("stdlib"))
+ implementation("org.jetbrains:annotations:17.0.0")
testCompile(project(":odcsim-engine-tests"))
testImplementation("org.junit.jupiter:junit-jupiter-api:$junitJupiterVersion")
diff --git a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
index ec088b98..dd92f90a 100644
--- a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
+++ b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt
@@ -39,8 +39,7 @@ import com.atlarge.odcsim.Terminated
import com.atlarge.odcsim.empty
import com.atlarge.odcsim.internal.BehaviorInterpreter
import com.atlarge.odcsim.internal.logging.LoggerImpl
-import com.sun.org.apache.xalan.internal.lib.ExsltDatetime.time
-import com.sun.xml.internal.messaging.saaj.soap.impl.EnvelopeImpl
+import org.jetbrains.annotations.Async
import org.slf4j.Logger
import java.util.Collections
import java.util.PriorityQueue
@@ -100,7 +99,7 @@ class OmegaActorSystem<in T : Any>(guardianBehavior: Behavior<T>, override val n
init {
registry[system] = Actor(ActorRefImpl(this, system), empty<Nothing>())
registry[path] = Actor(this, guardianBehavior)
- schedule(this, PreStart, .0)
+ schedule(path, PreStart, .0)
}
override fun run(until: Duration) {
@@ -131,10 +130,7 @@ class OmegaActorSystem<in T : Any>(guardianBehavior: Behavior<T>, override val n
time = delivery
queue.poll()
- val actor = registry[envelope.destination] ?: continue
-
- // Notice that messages for unknown/terminated actors are ignored for now
- actor.isolate { it.interpretMessage(envelope.message) }
+ processEnvelope(envelope)
}
// Jump forward in time as the caller expects the system to have run until the specified instant
@@ -142,7 +138,7 @@ class OmegaActorSystem<in T : Any>(guardianBehavior: Behavior<T>, override val n
time = max(time, until)
}
- override fun send(msg: T, after: Duration) = schedule(this, msg, after)
+ override fun send(msg: T, after: Duration) = schedule(path, msg, after)
override fun terminate() {
registry[path]?.stop(null)
@@ -161,6 +157,35 @@ class OmegaActorSystem<in T : Any>(guardianBehavior: Behavior<T>, override val n
private var nextId: Long = 0
/**
+ * Schedule a message to be processed by the engine.
+ *
+ * @param path The path to the destination of the message.
+ * @param message The message to schedule.
+ * @param delay The time to wait before processing the message.
+ */
+ private fun schedule(@Async.Schedule path: ActorPath, message: Any, delay: Duration) {
+ require(delay >= .0) { "The given delay must be a non-negative number" }
+ scheduleEnvelope(EnvelopeImpl(nextId++, path, time + delay, message))
+ }
+
+ /**
+ * Schedule the specified envelope to be processed by the engine.
+ */
+ private fun scheduleEnvelope(@Async.Schedule envelope: EnvelopeImpl) {
+ queue.add(envelope)
+ }
+
+ /**
+ * Process the delivery of a message.
+ */
+ private fun processEnvelope(@Async.Execute envelope: EnvelopeImpl) {
+ val actor = registry[envelope.destination] ?: return
+
+ // Notice that messages for unknown/terminated actors are ignored for now
+ actor.isolate { it.interpretMessage(envelope.message) }
+ }
+
+ /**
* An actor as represented in the Omega engine.
*
* @param self The [ActorRef] to this actor.
@@ -184,7 +209,7 @@ class OmegaActorSystem<in T : Any>(guardianBehavior: Behavior<T>, override val n
override fun getChild(name: String): ActorRef<*>? = childActors[name]?.self
- override fun <U : Any> send(ref: ActorRef<U>, msg: U, after: Duration) = schedule(ref, msg, after)
+ override fun <U : Any> send(ref: ActorRef<U>, msg: U, after: Duration) = schedule(ref.path, msg, after)
override fun <U : Any> spawn(behavior: Behavior<U>, name: String): ActorRef<U> {
require(name.isNotEmpty()) { "Actor name may not be empty" }
@@ -203,7 +228,7 @@ class OmegaActorSystem<in T : Any>(guardianBehavior: Behavior<T>, override val n
val actor = Actor(ref, behavior)
registry[ref.path] = actor
childActors[name] = actor
- schedule(ref, PreStart, .0)
+ schedule(ref.path, PreStart, .0)
actor.start()
return ref
}
@@ -332,27 +357,4 @@ class OmegaActorSystem<in T : Any>(guardianBehavior: Behavior<T>, override val n
override val time: Instant,
override val message: Any
) : Envelope<Any>
-
- /**
- * Schedule a message to be processed by the engine.
- *
- * @param destination The destination of the message.
- * @param message The message to schedule.
- * @param delay The time to wait before processing the message.
- */
- private fun schedule(destination: ActorRef<*>, message: Any, delay: Duration) {
- schedule(destination.path, message, delay)
- }
-
- /**
- * Schedule a message to be processed by the engine.
- *
- * @param path The path to the destination of the message.
- * @param message The message to schedule.
- * @param delay The time to wait before processing the message.
- */
- private fun schedule(path: ActorPath, message: Any, delay: Duration) {
- require(delay >= .0) { "The given delay must be a non-negative number" }
- queue.add(EnvelopeImpl(nextId++, path, time + delay, message))
- }
}