summaryrefslogtreecommitdiff
path: root/odcsim/odcsim-engine-omega/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-02-16 20:30:17 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-02-28 14:28:29 +0100
commitb82e573c67f0004945aa18c575268100fb279b56 (patch)
tree7af8e303ea9ab3821e8d8c7c1c7e9b1de058a9e7 /odcsim/odcsim-engine-omega/src
parent0c19b32433e2086e72e0d22595f4daa6ef04b64b (diff)
refactor: Change from logical processes to simulation domains
This change moves the simulator terminology from logical processes to simulation domains. This prevents the clash with "processes" that we are trying to simulate. In addition, simulation domains allows us to reduce the amount of boilerplate and instead allows for simulation modelled using standard techniques.
Diffstat (limited to 'odcsim/odcsim-engine-omega/src')
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt236
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt6
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt6
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt6
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt10
5 files changed, 69 insertions, 195 deletions
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
index 207d2768..4edf94d2 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
@@ -24,14 +24,8 @@
package com.atlarge.odcsim.engine.omega
-import com.atlarge.odcsim.Behavior
-import com.atlarge.odcsim.Channel
-import com.atlarge.odcsim.ProcessContext
-import com.atlarge.odcsim.ProcessRef
-import com.atlarge.odcsim.ReceivePort
-import com.atlarge.odcsim.ReceiveRef
-import com.atlarge.odcsim.SendPort
-import com.atlarge.odcsim.SendRef
+import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.SimulationContext
import com.atlarge.odcsim.SimulationEngine
import com.atlarge.odcsim.engine.omega.logging.LoggerImpl
import java.time.Clock
@@ -39,21 +33,19 @@ import java.time.Instant
import java.time.ZoneId
import java.util.PriorityQueue
import java.util.UUID
-import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
-import kotlin.coroutines.startCoroutine
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CoroutineDispatcher
+import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.Delay
import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.InternalCoroutinesApi
+import kotlinx.coroutines.Job
import kotlinx.coroutines.Runnable
import kotlinx.coroutines.SupervisorJob
-import kotlinx.coroutines.channels.Channel as KChannel
import kotlinx.coroutines.isActive
-import kotlinx.coroutines.selects.SelectClause1
import org.jetbrains.annotations.Async
import org.slf4j.Logger
@@ -63,10 +55,9 @@ import org.slf4j.Logger
* This engine implementation is a single-threaded implementation, running logical processes synchronously and
* provides a single priority queue for all events (messages, ticks, etc) that occur.
*
- * @param rootBehavior The behavior of the root actor.
* @param name The name of the engine instance.
*/
-class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : SimulationEngine {
+public class OmegaSimulationEngine(override val name: String) : SimulationEngine {
/**
* The state of the actor system.
*/
@@ -85,12 +76,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
/**
* The active processes in the simulation engine.
*/
- private val registry: MutableMap<ProcessRef, ProcessImpl> = HashMap()
-
- /**
- * The channels that have been registered by this engine.
- */
- private val channels: MutableSet<ChannelImpl<*>> = HashSet()
+ private val registry: MutableMap<String, Domain> = HashMap()
/**
* A unique increasing identifier assigned to each event, needed because otherwise two events occurring in sequence
@@ -99,29 +85,9 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
*/
private var nextId: Long = 0
- /**
- * The [CoroutineDispatcher] instance for dispatching the coroutines representing the logical behavior.
- */
- @InternalCoroutinesApi
- private val dispatcher: CoroutineDispatcher = object : CoroutineDispatcher(), Delay {
- override fun dispatch(context: CoroutineContext, block: Runnable) {
- schedule(Event.Dispatch(clock.time, nextId++, block))
- }
+ override fun newDomain(): Domain = newDomain(null)
- override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
- schedule(Event.Resume(clock.time + timeMillis, nextId++, this, continuation))
- }
-
- override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
- val event = Event.Timeout(clock.time + timeMillis, nextId++, block)
- schedule(event)
- return event
- }
- }
-
- init {
- spawn(rootBehavior, "/")
- }
+ override fun newDomain(name: String): Domain = newDomain(name, null)
override suspend fun run() {
check(state != SimulationEngineState.TERMINATED) { "The simulation engine is terminated" }
@@ -164,80 +130,71 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
}
/**
- * Spawn a new logical process in the simulation universe.
+ * Spawn a new simulation domain.
*/
- private fun spawn(behavior: Behavior, name: String): ProcessRef {
- val ref = ProcessRefImpl(this, name)
- require(ref !in registry) { "Process name $name not unique" }
- val lp = ProcessImpl(ref, behavior)
- registry[ref] = lp
- lp.start()
- return ref
+ private fun newDomainImpl(name: String, parent: DomainImpl?): Domain {
+ val domain = DomainImpl(name, parent)
+ require(domain.path !in registry) { "Domain name $name not unique" }
+ registry[domain.path] = domain
+ return domain
}
- /**
- * Register a new communication channel.
- */
- private fun <T : Any> open(): Channel<T> {
- val channel = ChannelImpl<T>()
- channels += channel
- return channel
+ private fun newDomain(parent: DomainImpl?): Domain {
+ val name = "$" + UUID.randomUUID()
+ return newDomainImpl(name, null)
+ }
+
+ private fun newDomain(name: String, parent: DomainImpl?): Domain {
+ require(name.isNotEmpty()) { "Domain name may not be empty" }
+ require(!name.startsWith("$")) { "Domain name may not start with $-sign" }
+ require(!name.contains("/")) { "Domain name may not contain /" }
+ return newDomainImpl(name, parent)
}
- private inner class ProcessImpl(override val self: ProcessRef, private val behavior: Behavior) : ProcessContext, Continuation<Unit> {
- val job = SupervisorJob()
+ private inner class DomainImpl(override val name: String, parent: DomainImpl?) : SimulationContext, Domain {
+ val job: Job = SupervisorJob(parent?.job)
+ val path: String = (parent?.path ?: "") + "/$name"
- override val clock: Clock
- get() = this@OmegaSimulationEngine.clock
+ @InternalCoroutinesApi
+ private val dispatcher = object : CoroutineDispatcher(), Delay {
+ // CoroutineDispatcher
+ override fun dispatch(context: CoroutineContext, block: Runnable) {
+ schedule(Event.Dispatch(clock.time, nextId++, block))
+ }
- override val log: Logger by lazy(LazyThreadSafetyMode.NONE) { LoggerImpl.invoke(this) }
+ // Delay
+ override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
+ schedule(Event.Resume(clock.time + timeMillis, nextId++, this, continuation))
+ }
- override fun spawn(behavior: Behavior): ProcessRef {
- val name = "$" + UUID.randomUUID()
- return this@OmegaSimulationEngine.spawn(behavior, name)
+ override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
+ val event = Event.Timeout(clock.time + timeMillis, nextId++, block)
+ schedule(event)
+ return event
+ }
}
- override fun spawn(behavior: Behavior, name: String): ProcessRef {
- require(name.isNotEmpty()) { "Process name may not be empty" }
- require(!name.startsWith("$")) { "Process name may not start with $-sign" }
- return this@OmegaSimulationEngine.spawn(behavior, name)
- }
+ // SimulationContext
+ override val key: CoroutineContext.Key<*> = SimulationContext.Key
- override fun <T : Any> open(): Channel<T> = this@OmegaSimulationEngine.open()
+ override val domain: Domain = this
- override suspend fun <T : Any> connect(send: SendRef<T>): SendPort<T> {
- require(send is ChannelImpl && send in channels) { "Invalid reference to channel" }
- return SendPortImpl(send)
- }
+ override val clock: VirtualClock
+ get() = this@OmegaSimulationEngine.clock
- override suspend fun <T : Any> listen(receive: ReceiveRef<T>): ReceivePort<T> {
- require(receive is ChannelImpl && receive in channels) { "Invalid reference to channel" }
- return ReceivePortImpl(receive)
- }
+ override val log: Logger by lazy(LazyThreadSafetyMode.NONE) { LoggerImpl.invoke(this) }
- /**
- * Start this logical process.
- */
- fun start() = behavior.startCoroutine(this, this)
-
- override fun resumeWith(result: Result<Unit>) {
- // Stop the logical process
- if (result.isFailure) {
- result.exceptionOrNull()!!.printStackTrace()
- job.completeExceptionally(result.exceptionOrNull()!!)
- } else {
- job.complete()
- }
- }
+ override fun newDomain(): Domain = this@OmegaSimulationEngine.newDomain(this)
- override val key: CoroutineContext.Key<*> = ProcessContext.Key
+ override fun newDomain(name: String): Domain = this@OmegaSimulationEngine.newDomain(name, this)
- @InternalCoroutinesApi
- override val coroutineContext: CoroutineContext
- get() = context
+ // Domain
+ override val parent: Domain = parent ?: this
@InternalCoroutinesApi
- override val context: CoroutineContext = this + dispatcher + job
+ override val coroutineContext: CoroutineContext = this + CoroutineName(name) + dispatcher + job
+
+ override fun toString(): String = path
}
/**
@@ -248,81 +205,6 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
}
/**
- * Internal [ProcessRef] implementation for this simulation engine.
- */
- private data class ProcessRefImpl(
- private val owner: OmegaSimulationEngine,
- override val name: String
- ) : ProcessRef {
- override fun toString(): String = "Process[$name]"
- }
-
- /**
- * Internal [Channel] implementation.
- */
- private inner class ChannelImpl<T : Any> : Channel<T>, SendRef<T>, ReceiveRef<T> {
- override val send: SendRef<T> = this
- override val receive: ReceiveRef<T> = this
-
- /**
- * The underlying `kotlinx.coroutines` channel to back this channel implementation.
- */
- private val channel = KChannel<T>(KChannel.UNLIMITED)
-
- val onReceive: SelectClause1<T>
- get() = channel.onReceive
-
- /**
- * Receive a message from this channel.
- */
- suspend fun receive(): T = channel.receive()
-
- /**
- * Send a message to this channel.
- */
- fun send(message: T) = assert(channel.offer(message)) { "Failed to send message" }
- }
-
- private inner class SendPortImpl<T : Any>(private val channelImpl: ChannelImpl<T>) : SendPort<T> {
- private var closed = false
-
- override fun close(): Boolean {
- if (closed) {
- return false
- }
-
- closed = true
- return true
- }
-
- override fun send(message: T) {
- check(!closed) { "Port is closed" }
- schedule(Event.Send(clock.time, nextId++, channelImpl, message))
- }
- }
-
- private class ReceivePortImpl<T : Any>(private val channel: ChannelImpl<T>) : ReceivePort<T> {
- private var closed = false
-
- override fun close(): Boolean {
- if (closed) {
- return false
- }
-
- closed = true
- return true
- }
-
- override val onReceive: SelectClause1<T>
- get() = channel.onReceive
-
- override suspend fun receive(): T {
- check(!closed) { "Port is closed" }
- return channel.receive()
- }
- }
-
- /**
* A wrapper around a message that has been scheduled for processing.
*
* @property time The point in time to deliver the message.
@@ -361,19 +243,13 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
override fun toString(): String = "Timeout[$time]"
}
-
- class Send<T : Any>(time: Long, id: Long, val channel: ChannelImpl<T>, val message: T) : Event(time, id) {
- override fun run() {
- channel.send(message)
- }
- }
}
/**
* A virtual [Clock] implementation for keeping track of simulation time.
*/
private data class VirtualClock(var time: Long) : Clock() {
- override fun withZone(zone: ZoneId?): Clock = TODO("not implemented")
+ override fun withZone(zone: ZoneId?): Clock = throw NotImplementedError()
override fun getZone(): ZoneId = ZoneId.systemDefault()
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt
index 75bb2265..5dba3233 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt
@@ -24,7 +24,6 @@
package com.atlarge.odcsim.engine.omega
-import com.atlarge.odcsim.Behavior
import com.atlarge.odcsim.SimulationEngine
import com.atlarge.odcsim.SimulationEngineProvider
import java.util.ServiceLoader
@@ -33,7 +32,6 @@ import java.util.ServiceLoader
* An [SimulationEngineProvider] for the Omega engine, used by the [ServiceLoader] API to create
* [OmegaSimulationEngine] instances.
*/
-class OmegaSimulationEngineProvider : SimulationEngineProvider {
- override operator fun invoke(root: Behavior, name: String): SimulationEngine =
- OmegaSimulationEngine(root, name)
+public class OmegaSimulationEngineProvider : SimulationEngineProvider {
+ override operator fun invoke(name: String): SimulationEngine = OmegaSimulationEngine(name)
}
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt
index 5db989e8..fca4826e 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt
@@ -24,7 +24,7 @@
package com.atlarge.odcsim.engine.omega.logging
-import com.atlarge.odcsim.ProcessContext
+import com.atlarge.odcsim.SimulationContext
import org.slf4j.Logger
import org.slf4j.Marker
import org.slf4j.helpers.MessageFormatter
@@ -33,11 +33,11 @@ import org.slf4j.spi.LocationAwareLogger
/**
* An actor-specific [Logger] implementation that is aware of the calling location.
*
- * @param ctx The owning [ProcessContext] of this logger.
+ * @param ctx The owning [SimulationContext] of this logger.
* @param delegate The [LocationAwareLogger] to delegate the messages to.
*/
internal class LocationAwareLoggerImpl(
- ctx: ProcessContext,
+ ctx: SimulationContext,
private val delegate: LocationAwareLogger
) : LoggerImpl(ctx), Logger by delegate {
/**
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt
index b77e85e7..856cecfa 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt
@@ -24,18 +24,18 @@
package com.atlarge.odcsim.engine.omega.logging
-import com.atlarge.odcsim.ProcessContext
+import com.atlarge.odcsim.SimulationContext
import org.slf4j.Logger
import org.slf4j.Marker
/**
* A [Logger] implementation that is not aware of the calling location.
*
- * @param ctx The owning [ProcessContext] of this logger.
+ * @param ctx The owning [SimulationContext] of this logger.
* @param delegate The [Logger] to delegate the messages to.
*/
internal class LocationIgnorantLoggerImpl(
- ctx: ProcessContext,
+ ctx: SimulationContext,
private val delegate: Logger
) : LoggerImpl(ctx), Logger by delegate {
override fun warn(marker: Marker?, format: String?, arg1: Any?, arg2: Any?) {
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt
index 5a52473b..1adcfdc0 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt
@@ -24,7 +24,7 @@
package com.atlarge.odcsim.engine.omega.logging
-import com.atlarge.odcsim.ProcessContext
+import com.atlarge.odcsim.SimulationContext
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.MDC
@@ -33,14 +33,14 @@ import org.slf4j.spi.LocationAwareLogger
/**
* An actor-specific [Logger] implementation.
*
- * @param ctx The owning [ProcessContext] of this logger.
+ * @param ctx The owning [SimulationContext] of this logger.
*/
-internal abstract class LoggerImpl internal constructor(protected val ctx: ProcessContext) : Logger {
+internal abstract class LoggerImpl internal constructor(protected val ctx: SimulationContext) : Logger {
/**
* Configure [MDC] with actor-specific information.
*/
protected inline fun withMdc(block: () -> Unit) {
- MDC.put(MDC_PROCESS_REF, ctx.self.name)
+ MDC.put(MDC_PROCESS_REF, ctx.domain.name)
MDC.put(MDC_PROCESS_TIME, String.format("%d", ctx.clock.millis()))
try {
block()
@@ -62,7 +62,7 @@ internal abstract class LoggerImpl internal constructor(protected val ctx: Proce
*
* @param ctx The actor context to create the logger for.
*/
- operator fun invoke(ctx: ProcessContext): Logger {
+ operator fun invoke(ctx: SimulationContext): Logger {
val logger = LoggerFactory.getLogger(ctx.javaClass)
return if (logger is LocationAwareLogger) {
LocationAwareLoggerImpl(ctx, logger)