diff options
3 files changed, 36 insertions, 34 deletions
diff --git a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/StashBufferImpl.kt b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/StashBufferImpl.kt index 1d85a8e6..24c3a9d5 100644 --- a/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/StashBufferImpl.kt +++ b/odcsim-core/src/main/kotlin/com/atlarge/odcsim/internal/StashBufferImpl.kt @@ -71,5 +71,4 @@ internal class StashBufferImpl<T : Any>(private val capacity: Int) : StashBuffer return interpreter.behavior } - } diff --git a/odcsim-engine-omega/build.gradle.kts b/odcsim-engine-omega/build.gradle.kts index 7f092f05..57e68b2c 100644 --- a/odcsim-engine-omega/build.gradle.kts +++ b/odcsim-engine-omega/build.gradle.kts @@ -40,6 +40,7 @@ dependencies { api(project(":odcsim-core")) implementation(kotlin("stdlib")) + implementation("org.jetbrains:annotations:17.0.0") testCompile(project(":odcsim-engine-tests")) testImplementation("org.junit.jupiter:junit-jupiter-api:$junitJupiterVersion") diff --git a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt index ec088b98..dd92f90a 100644 --- a/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt +++ b/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaActorSystem.kt @@ -39,8 +39,7 @@ import com.atlarge.odcsim.Terminated import com.atlarge.odcsim.empty import com.atlarge.odcsim.internal.BehaviorInterpreter import com.atlarge.odcsim.internal.logging.LoggerImpl -import com.sun.org.apache.xalan.internal.lib.ExsltDatetime.time -import com.sun.xml.internal.messaging.saaj.soap.impl.EnvelopeImpl +import org.jetbrains.annotations.Async import org.slf4j.Logger import java.util.Collections import java.util.PriorityQueue @@ -100,7 +99,7 @@ class OmegaActorSystem<in T : Any>(guardianBehavior: Behavior<T>, override val n init { registry[system] = Actor(ActorRefImpl(this, system), empty<Nothing>()) registry[path] = Actor(this, guardianBehavior) - schedule(this, PreStart, .0) + schedule(path, PreStart, .0) } override fun run(until: Duration) { @@ -131,10 +130,7 @@ class OmegaActorSystem<in T : Any>(guardianBehavior: Behavior<T>, override val n time = delivery queue.poll() - val actor = registry[envelope.destination] ?: continue - - // Notice that messages for unknown/terminated actors are ignored for now - actor.isolate { it.interpretMessage(envelope.message) } + processEnvelope(envelope) } // Jump forward in time as the caller expects the system to have run until the specified instant @@ -142,7 +138,7 @@ class OmegaActorSystem<in T : Any>(guardianBehavior: Behavior<T>, override val n time = max(time, until) } - override fun send(msg: T, after: Duration) = schedule(this, msg, after) + override fun send(msg: T, after: Duration) = schedule(path, msg, after) override fun terminate() { registry[path]?.stop(null) @@ -161,6 +157,35 @@ class OmegaActorSystem<in T : Any>(guardianBehavior: Behavior<T>, override val n private var nextId: Long = 0 /** + * Schedule a message to be processed by the engine. + * + * @param path The path to the destination of the message. + * @param message The message to schedule. + * @param delay The time to wait before processing the message. + */ + private fun schedule(@Async.Schedule path: ActorPath, message: Any, delay: Duration) { + require(delay >= .0) { "The given delay must be a non-negative number" } + scheduleEnvelope(EnvelopeImpl(nextId++, path, time + delay, message)) + } + + /** + * Schedule the specified envelope to be processed by the engine. + */ + private fun scheduleEnvelope(@Async.Schedule envelope: EnvelopeImpl) { + queue.add(envelope) + } + + /** + * Process the delivery of a message. + */ + private fun processEnvelope(@Async.Execute envelope: EnvelopeImpl) { + val actor = registry[envelope.destination] ?: return + + // Notice that messages for unknown/terminated actors are ignored for now + actor.isolate { it.interpretMessage(envelope.message) } + } + + /** * An actor as represented in the Omega engine. * * @param self The [ActorRef] to this actor. @@ -184,7 +209,7 @@ class OmegaActorSystem<in T : Any>(guardianBehavior: Behavior<T>, override val n override fun getChild(name: String): ActorRef<*>? = childActors[name]?.self - override fun <U : Any> send(ref: ActorRef<U>, msg: U, after: Duration) = schedule(ref, msg, after) + override fun <U : Any> send(ref: ActorRef<U>, msg: U, after: Duration) = schedule(ref.path, msg, after) override fun <U : Any> spawn(behavior: Behavior<U>, name: String): ActorRef<U> { require(name.isNotEmpty()) { "Actor name may not be empty" } @@ -203,7 +228,7 @@ class OmegaActorSystem<in T : Any>(guardianBehavior: Behavior<T>, override val n val actor = Actor(ref, behavior) registry[ref.path] = actor childActors[name] = actor - schedule(ref, PreStart, .0) + schedule(ref.path, PreStart, .0) actor.start() return ref } @@ -332,27 +357,4 @@ class OmegaActorSystem<in T : Any>(guardianBehavior: Behavior<T>, override val n override val time: Instant, override val message: Any ) : Envelope<Any> - - /** - * Schedule a message to be processed by the engine. - * - * @param destination The destination of the message. - * @param message The message to schedule. - * @param delay The time to wait before processing the message. - */ - private fun schedule(destination: ActorRef<*>, message: Any, delay: Duration) { - schedule(destination.path, message, delay) - } - - /** - * Schedule a message to be processed by the engine. - * - * @param path The path to the destination of the message. - * @param message The message to schedule. - * @param delay The time to wait before processing the message. - */ - private fun schedule(path: ActorPath, message: Any, delay: Duration) { - require(delay >= .0) { "The given delay must be a non-negative number" } - queue.add(EnvelopeImpl(nextId++, path, time + delay, message)) - } } |
