summaryrefslogtreecommitdiff
path: root/odcsim/odcsim-engine-omega
diff options
context:
space:
mode:
Diffstat (limited to 'odcsim/odcsim-engine-omega')
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt236
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt6
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt6
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt6
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt10
5 files changed, 69 insertions, 195 deletions
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
index 207d2768..4edf94d2 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
@@ -24,14 +24,8 @@
package com.atlarge.odcsim.engine.omega
-import com.atlarge.odcsim.Behavior
-import com.atlarge.odcsim.Channel
-import com.atlarge.odcsim.ProcessContext
-import com.atlarge.odcsim.ProcessRef
-import com.atlarge.odcsim.ReceivePort
-import com.atlarge.odcsim.ReceiveRef
-import com.atlarge.odcsim.SendPort
-import com.atlarge.odcsim.SendRef
+import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.SimulationContext
import com.atlarge.odcsim.SimulationEngine
import com.atlarge.odcsim.engine.omega.logging.LoggerImpl
import java.time.Clock
@@ -39,21 +33,19 @@ import java.time.Instant
import java.time.ZoneId
import java.util.PriorityQueue
import java.util.UUID
-import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
-import kotlin.coroutines.startCoroutine
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CoroutineDispatcher
+import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.Delay
import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.InternalCoroutinesApi
+import kotlinx.coroutines.Job
import kotlinx.coroutines.Runnable
import kotlinx.coroutines.SupervisorJob
-import kotlinx.coroutines.channels.Channel as KChannel
import kotlinx.coroutines.isActive
-import kotlinx.coroutines.selects.SelectClause1
import org.jetbrains.annotations.Async
import org.slf4j.Logger
@@ -63,10 +55,9 @@ import org.slf4j.Logger
* This engine implementation is a single-threaded implementation, running logical processes synchronously and
* provides a single priority queue for all events (messages, ticks, etc) that occur.
*
- * @param rootBehavior The behavior of the root actor.
* @param name The name of the engine instance.
*/
-class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : SimulationEngine {
+public class OmegaSimulationEngine(override val name: String) : SimulationEngine {
/**
* The state of the actor system.
*/
@@ -85,12 +76,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
/**
* The active processes in the simulation engine.
*/
- private val registry: MutableMap<ProcessRef, ProcessImpl> = HashMap()
-
- /**
- * The channels that have been registered by this engine.
- */
- private val channels: MutableSet<ChannelImpl<*>> = HashSet()
+ private val registry: MutableMap<String, Domain> = HashMap()
/**
* A unique increasing identifier assigned to each event, needed because otherwise two events occurring in sequence
@@ -99,29 +85,9 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
*/
private var nextId: Long = 0
- /**
- * The [CoroutineDispatcher] instance for dispatching the coroutines representing the logical behavior.
- */
- @InternalCoroutinesApi
- private val dispatcher: CoroutineDispatcher = object : CoroutineDispatcher(), Delay {
- override fun dispatch(context: CoroutineContext, block: Runnable) {
- schedule(Event.Dispatch(clock.time, nextId++, block))
- }
+ override fun newDomain(): Domain = newDomain(null)
- override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
- schedule(Event.Resume(clock.time + timeMillis, nextId++, this, continuation))
- }
-
- override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
- val event = Event.Timeout(clock.time + timeMillis, nextId++, block)
- schedule(event)
- return event
- }
- }
-
- init {
- spawn(rootBehavior, "/")
- }
+ override fun newDomain(name: String): Domain = newDomain(name, null)
override suspend fun run() {
check(state != SimulationEngineState.TERMINATED) { "The simulation engine is terminated" }
@@ -164,80 +130,71 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
}
/**
- * Spawn a new logical process in the simulation universe.
+ * Spawn a new simulation domain.
*/
- private fun spawn(behavior: Behavior, name: String): ProcessRef {
- val ref = ProcessRefImpl(this, name)
- require(ref !in registry) { "Process name $name not unique" }
- val lp = ProcessImpl(ref, behavior)
- registry[ref] = lp
- lp.start()
- return ref
+ private fun newDomainImpl(name: String, parent: DomainImpl?): Domain {
+ val domain = DomainImpl(name, parent)
+ require(domain.path !in registry) { "Domain name $name not unique" }
+ registry[domain.path] = domain
+ return domain
}
- /**
- * Register a new communication channel.
- */
- private fun <T : Any> open(): Channel<T> {
- val channel = ChannelImpl<T>()
- channels += channel
- return channel
+ private fun newDomain(parent: DomainImpl?): Domain {
+ val name = "$" + UUID.randomUUID()
+ return newDomainImpl(name, null)
+ }
+
+ private fun newDomain(name: String, parent: DomainImpl?): Domain {
+ require(name.isNotEmpty()) { "Domain name may not be empty" }
+ require(!name.startsWith("$")) { "Domain name may not start with $-sign" }
+ require(!name.contains("/")) { "Domain name may not contain /" }
+ return newDomainImpl(name, parent)
}
- private inner class ProcessImpl(override val self: ProcessRef, private val behavior: Behavior) : ProcessContext, Continuation<Unit> {
- val job = SupervisorJob()
+ private inner class DomainImpl(override val name: String, parent: DomainImpl?) : SimulationContext, Domain {
+ val job: Job = SupervisorJob(parent?.job)
+ val path: String = (parent?.path ?: "") + "/$name"
- override val clock: Clock
- get() = this@OmegaSimulationEngine.clock
+ @InternalCoroutinesApi
+ private val dispatcher = object : CoroutineDispatcher(), Delay {
+ // CoroutineDispatcher
+ override fun dispatch(context: CoroutineContext, block: Runnable) {
+ schedule(Event.Dispatch(clock.time, nextId++, block))
+ }
- override val log: Logger by lazy(LazyThreadSafetyMode.NONE) { LoggerImpl.invoke(this) }
+ // Delay
+ override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
+ schedule(Event.Resume(clock.time + timeMillis, nextId++, this, continuation))
+ }
- override fun spawn(behavior: Behavior): ProcessRef {
- val name = "$" + UUID.randomUUID()
- return this@OmegaSimulationEngine.spawn(behavior, name)
+ override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
+ val event = Event.Timeout(clock.time + timeMillis, nextId++, block)
+ schedule(event)
+ return event
+ }
}
- override fun spawn(behavior: Behavior, name: String): ProcessRef {
- require(name.isNotEmpty()) { "Process name may not be empty" }
- require(!name.startsWith("$")) { "Process name may not start with $-sign" }
- return this@OmegaSimulationEngine.spawn(behavior, name)
- }
+ // SimulationContext
+ override val key: CoroutineContext.Key<*> = SimulationContext.Key
- override fun <T : Any> open(): Channel<T> = this@OmegaSimulationEngine.open()
+ override val domain: Domain = this
- override suspend fun <T : Any> connect(send: SendRef<T>): SendPort<T> {
- require(send is ChannelImpl && send in channels) { "Invalid reference to channel" }
- return SendPortImpl(send)
- }
+ override val clock: VirtualClock
+ get() = this@OmegaSimulationEngine.clock
- override suspend fun <T : Any> listen(receive: ReceiveRef<T>): ReceivePort<T> {
- require(receive is ChannelImpl && receive in channels) { "Invalid reference to channel" }
- return ReceivePortImpl(receive)
- }
+ override val log: Logger by lazy(LazyThreadSafetyMode.NONE) { LoggerImpl.invoke(this) }
- /**
- * Start this logical process.
- */
- fun start() = behavior.startCoroutine(this, this)
-
- override fun resumeWith(result: Result<Unit>) {
- // Stop the logical process
- if (result.isFailure) {
- result.exceptionOrNull()!!.printStackTrace()
- job.completeExceptionally(result.exceptionOrNull()!!)
- } else {
- job.complete()
- }
- }
+ override fun newDomain(): Domain = this@OmegaSimulationEngine.newDomain(this)
- override val key: CoroutineContext.Key<*> = ProcessContext.Key
+ override fun newDomain(name: String): Domain = this@OmegaSimulationEngine.newDomain(name, this)
- @InternalCoroutinesApi
- override val coroutineContext: CoroutineContext
- get() = context
+ // Domain
+ override val parent: Domain = parent ?: this
@InternalCoroutinesApi
- override val context: CoroutineContext = this + dispatcher + job
+ override val coroutineContext: CoroutineContext = this + CoroutineName(name) + dispatcher + job
+
+ override fun toString(): String = path
}
/**
@@ -248,81 +205,6 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
}
/**
- * Internal [ProcessRef] implementation for this simulation engine.
- */
- private data class ProcessRefImpl(
- private val owner: OmegaSimulationEngine,
- override val name: String
- ) : ProcessRef {
- override fun toString(): String = "Process[$name]"
- }
-
- /**
- * Internal [Channel] implementation.
- */
- private inner class ChannelImpl<T : Any> : Channel<T>, SendRef<T>, ReceiveRef<T> {
- override val send: SendRef<T> = this
- override val receive: ReceiveRef<T> = this
-
- /**
- * The underlying `kotlinx.coroutines` channel to back this channel implementation.
- */
- private val channel = KChannel<T>(KChannel.UNLIMITED)
-
- val onReceive: SelectClause1<T>
- get() = channel.onReceive
-
- /**
- * Receive a message from this channel.
- */
- suspend fun receive(): T = channel.receive()
-
- /**
- * Send a message to this channel.
- */
- fun send(message: T) = assert(channel.offer(message)) { "Failed to send message" }
- }
-
- private inner class SendPortImpl<T : Any>(private val channelImpl: ChannelImpl<T>) : SendPort<T> {
- private var closed = false
-
- override fun close(): Boolean {
- if (closed) {
- return false
- }
-
- closed = true
- return true
- }
-
- override fun send(message: T) {
- check(!closed) { "Port is closed" }
- schedule(Event.Send(clock.time, nextId++, channelImpl, message))
- }
- }
-
- private class ReceivePortImpl<T : Any>(private val channel: ChannelImpl<T>) : ReceivePort<T> {
- private var closed = false
-
- override fun close(): Boolean {
- if (closed) {
- return false
- }
-
- closed = true
- return true
- }
-
- override val onReceive: SelectClause1<T>
- get() = channel.onReceive
-
- override suspend fun receive(): T {
- check(!closed) { "Port is closed" }
- return channel.receive()
- }
- }
-
- /**
* A wrapper around a message that has been scheduled for processing.
*
* @property time The point in time to deliver the message.
@@ -361,19 +243,13 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
override fun toString(): String = "Timeout[$time]"
}
-
- class Send<T : Any>(time: Long, id: Long, val channel: ChannelImpl<T>, val message: T) : Event(time, id) {
- override fun run() {
- channel.send(message)
- }
- }
}
/**
* A virtual [Clock] implementation for keeping track of simulation time.
*/
private data class VirtualClock(var time: Long) : Clock() {
- override fun withZone(zone: ZoneId?): Clock = TODO("not implemented")
+ override fun withZone(zone: ZoneId?): Clock = throw NotImplementedError()
override fun getZone(): ZoneId = ZoneId.systemDefault()
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt
index 75bb2265..5dba3233 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt
@@ -24,7 +24,6 @@
package com.atlarge.odcsim.engine.omega
-import com.atlarge.odcsim.Behavior
import com.atlarge.odcsim.SimulationEngine
import com.atlarge.odcsim.SimulationEngineProvider
import java.util.ServiceLoader
@@ -33,7 +32,6 @@ import java.util.ServiceLoader
* An [SimulationEngineProvider] for the Omega engine, used by the [ServiceLoader] API to create
* [OmegaSimulationEngine] instances.
*/
-class OmegaSimulationEngineProvider : SimulationEngineProvider {
- override operator fun invoke(root: Behavior, name: String): SimulationEngine =
- OmegaSimulationEngine(root, name)
+public class OmegaSimulationEngineProvider : SimulationEngineProvider {
+ override operator fun invoke(name: String): SimulationEngine = OmegaSimulationEngine(name)
}
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt
index 5db989e8..fca4826e 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt
@@ -24,7 +24,7 @@
package com.atlarge.odcsim.engine.omega.logging
-import com.atlarge.odcsim.ProcessContext
+import com.atlarge.odcsim.SimulationContext
import org.slf4j.Logger
import org.slf4j.Marker
import org.slf4j.helpers.MessageFormatter
@@ -33,11 +33,11 @@ import org.slf4j.spi.LocationAwareLogger
/**
* An actor-specific [Logger] implementation that is aware of the calling location.
*
- * @param ctx The owning [ProcessContext] of this logger.
+ * @param ctx The owning [SimulationContext] of this logger.
* @param delegate The [LocationAwareLogger] to delegate the messages to.
*/
internal class LocationAwareLoggerImpl(
- ctx: ProcessContext,
+ ctx: SimulationContext,
private val delegate: LocationAwareLogger
) : LoggerImpl(ctx), Logger by delegate {
/**
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt
index b77e85e7..856cecfa 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt
@@ -24,18 +24,18 @@
package com.atlarge.odcsim.engine.omega.logging
-import com.atlarge.odcsim.ProcessContext
+import com.atlarge.odcsim.SimulationContext
import org.slf4j.Logger
import org.slf4j.Marker
/**
* A [Logger] implementation that is not aware of the calling location.
*
- * @param ctx The owning [ProcessContext] of this logger.
+ * @param ctx The owning [SimulationContext] of this logger.
* @param delegate The [Logger] to delegate the messages to.
*/
internal class LocationIgnorantLoggerImpl(
- ctx: ProcessContext,
+ ctx: SimulationContext,
private val delegate: Logger
) : LoggerImpl(ctx), Logger by delegate {
override fun warn(marker: Marker?, format: String?, arg1: Any?, arg2: Any?) {
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt
index 5a52473b..1adcfdc0 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt
@@ -24,7 +24,7 @@
package com.atlarge.odcsim.engine.omega.logging
-import com.atlarge.odcsim.ProcessContext
+import com.atlarge.odcsim.SimulationContext
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.MDC
@@ -33,14 +33,14 @@ import org.slf4j.spi.LocationAwareLogger
/**
* An actor-specific [Logger] implementation.
*
- * @param ctx The owning [ProcessContext] of this logger.
+ * @param ctx The owning [SimulationContext] of this logger.
*/
-internal abstract class LoggerImpl internal constructor(protected val ctx: ProcessContext) : Logger {
+internal abstract class LoggerImpl internal constructor(protected val ctx: SimulationContext) : Logger {
/**
* Configure [MDC] with actor-specific information.
*/
protected inline fun withMdc(block: () -> Unit) {
- MDC.put(MDC_PROCESS_REF, ctx.self.name)
+ MDC.put(MDC_PROCESS_REF, ctx.domain.name)
MDC.put(MDC_PROCESS_TIME, String.format("%d", ctx.clock.millis()))
try {
block()
@@ -62,7 +62,7 @@ internal abstract class LoggerImpl internal constructor(protected val ctx: Proce
*
* @param ctx The actor context to create the logger for.
*/
- operator fun invoke(ctx: ProcessContext): Logger {
+ operator fun invoke(ctx: SimulationContext): Logger {
val logger = LoggerFactory.getLogger(ctx.javaClass)
return if (logger is LocationAwareLogger) {
LocationAwareLoggerImpl(ctx, logger)