diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-02-16 20:30:17 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-02-28 14:28:29 +0100 |
| commit | b82e573c67f0004945aa18c575268100fb279b56 (patch) | |
| tree | 7af8e303ea9ab3821e8d8c7c1c7e9b1de058a9e7 /odcsim/odcsim-engine-omega/src | |
| parent | 0c19b32433e2086e72e0d22595f4daa6ef04b64b (diff) | |
refactor: Change from logical processes to simulation domains
This change moves the simulator terminology from logical processes to
simulation domains. This prevents the clash with "processes" that we are
trying to simulate.
In addition, simulation domains allows us to reduce the amount of
boilerplate and instead allows for simulation modelled using standard
techniques.
Diffstat (limited to 'odcsim/odcsim-engine-omega/src')
5 files changed, 69 insertions, 195 deletions
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt index 207d2768..4edf94d2 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt @@ -24,14 +24,8 @@ package com.atlarge.odcsim.engine.omega -import com.atlarge.odcsim.Behavior -import com.atlarge.odcsim.Channel -import com.atlarge.odcsim.ProcessContext -import com.atlarge.odcsim.ProcessRef -import com.atlarge.odcsim.ReceivePort -import com.atlarge.odcsim.ReceiveRef -import com.atlarge.odcsim.SendPort -import com.atlarge.odcsim.SendRef +import com.atlarge.odcsim.Domain +import com.atlarge.odcsim.SimulationContext import com.atlarge.odcsim.SimulationEngine import com.atlarge.odcsim.engine.omega.logging.LoggerImpl import java.time.Clock @@ -39,21 +33,19 @@ import java.time.Instant import java.time.ZoneId import java.util.PriorityQueue import java.util.UUID -import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext import kotlin.coroutines.coroutineContext -import kotlin.coroutines.startCoroutine import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.Delay import kotlinx.coroutines.DisposableHandle import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.Job import kotlinx.coroutines.Runnable import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.channels.Channel as KChannel import kotlinx.coroutines.isActive -import kotlinx.coroutines.selects.SelectClause1 import org.jetbrains.annotations.Async import org.slf4j.Logger @@ -63,10 +55,9 @@ import org.slf4j.Logger * This engine implementation is a single-threaded implementation, running logical processes synchronously and * provides a single priority queue for all events (messages, ticks, etc) that occur. * - * @param rootBehavior The behavior of the root actor. * @param name The name of the engine instance. */ -class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : SimulationEngine { +public class OmegaSimulationEngine(override val name: String) : SimulationEngine { /** * The state of the actor system. */ @@ -85,12 +76,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : /** * The active processes in the simulation engine. */ - private val registry: MutableMap<ProcessRef, ProcessImpl> = HashMap() - - /** - * The channels that have been registered by this engine. - */ - private val channels: MutableSet<ChannelImpl<*>> = HashSet() + private val registry: MutableMap<String, Domain> = HashMap() /** * A unique increasing identifier assigned to each event, needed because otherwise two events occurring in sequence @@ -99,29 +85,9 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : */ private var nextId: Long = 0 - /** - * The [CoroutineDispatcher] instance for dispatching the coroutines representing the logical behavior. - */ - @InternalCoroutinesApi - private val dispatcher: CoroutineDispatcher = object : CoroutineDispatcher(), Delay { - override fun dispatch(context: CoroutineContext, block: Runnable) { - schedule(Event.Dispatch(clock.time, nextId++, block)) - } + override fun newDomain(): Domain = newDomain(null) - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { - schedule(Event.Resume(clock.time + timeMillis, nextId++, this, continuation)) - } - - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { - val event = Event.Timeout(clock.time + timeMillis, nextId++, block) - schedule(event) - return event - } - } - - init { - spawn(rootBehavior, "/") - } + override fun newDomain(name: String): Domain = newDomain(name, null) override suspend fun run() { check(state != SimulationEngineState.TERMINATED) { "The simulation engine is terminated" } @@ -164,80 +130,71 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : } /** - * Spawn a new logical process in the simulation universe. + * Spawn a new simulation domain. */ - private fun spawn(behavior: Behavior, name: String): ProcessRef { - val ref = ProcessRefImpl(this, name) - require(ref !in registry) { "Process name $name not unique" } - val lp = ProcessImpl(ref, behavior) - registry[ref] = lp - lp.start() - return ref + private fun newDomainImpl(name: String, parent: DomainImpl?): Domain { + val domain = DomainImpl(name, parent) + require(domain.path !in registry) { "Domain name $name not unique" } + registry[domain.path] = domain + return domain } - /** - * Register a new communication channel. - */ - private fun <T : Any> open(): Channel<T> { - val channel = ChannelImpl<T>() - channels += channel - return channel + private fun newDomain(parent: DomainImpl?): Domain { + val name = "$" + UUID.randomUUID() + return newDomainImpl(name, null) + } + + private fun newDomain(name: String, parent: DomainImpl?): Domain { + require(name.isNotEmpty()) { "Domain name may not be empty" } + require(!name.startsWith("$")) { "Domain name may not start with $-sign" } + require(!name.contains("/")) { "Domain name may not contain /" } + return newDomainImpl(name, parent) } - private inner class ProcessImpl(override val self: ProcessRef, private val behavior: Behavior) : ProcessContext, Continuation<Unit> { - val job = SupervisorJob() + private inner class DomainImpl(override val name: String, parent: DomainImpl?) : SimulationContext, Domain { + val job: Job = SupervisorJob(parent?.job) + val path: String = (parent?.path ?: "") + "/$name" - override val clock: Clock - get() = this@OmegaSimulationEngine.clock + @InternalCoroutinesApi + private val dispatcher = object : CoroutineDispatcher(), Delay { + // CoroutineDispatcher + override fun dispatch(context: CoroutineContext, block: Runnable) { + schedule(Event.Dispatch(clock.time, nextId++, block)) + } - override val log: Logger by lazy(LazyThreadSafetyMode.NONE) { LoggerImpl.invoke(this) } + // Delay + override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { + schedule(Event.Resume(clock.time + timeMillis, nextId++, this, continuation)) + } - override fun spawn(behavior: Behavior): ProcessRef { - val name = "$" + UUID.randomUUID() - return this@OmegaSimulationEngine.spawn(behavior, name) + override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + val event = Event.Timeout(clock.time + timeMillis, nextId++, block) + schedule(event) + return event + } } - override fun spawn(behavior: Behavior, name: String): ProcessRef { - require(name.isNotEmpty()) { "Process name may not be empty" } - require(!name.startsWith("$")) { "Process name may not start with $-sign" } - return this@OmegaSimulationEngine.spawn(behavior, name) - } + // SimulationContext + override val key: CoroutineContext.Key<*> = SimulationContext.Key - override fun <T : Any> open(): Channel<T> = this@OmegaSimulationEngine.open() + override val domain: Domain = this - override suspend fun <T : Any> connect(send: SendRef<T>): SendPort<T> { - require(send is ChannelImpl && send in channels) { "Invalid reference to channel" } - return SendPortImpl(send) - } + override val clock: VirtualClock + get() = this@OmegaSimulationEngine.clock - override suspend fun <T : Any> listen(receive: ReceiveRef<T>): ReceivePort<T> { - require(receive is ChannelImpl && receive in channels) { "Invalid reference to channel" } - return ReceivePortImpl(receive) - } + override val log: Logger by lazy(LazyThreadSafetyMode.NONE) { LoggerImpl.invoke(this) } - /** - * Start this logical process. - */ - fun start() = behavior.startCoroutine(this, this) - - override fun resumeWith(result: Result<Unit>) { - // Stop the logical process - if (result.isFailure) { - result.exceptionOrNull()!!.printStackTrace() - job.completeExceptionally(result.exceptionOrNull()!!) - } else { - job.complete() - } - } + override fun newDomain(): Domain = this@OmegaSimulationEngine.newDomain(this) - override val key: CoroutineContext.Key<*> = ProcessContext.Key + override fun newDomain(name: String): Domain = this@OmegaSimulationEngine.newDomain(name, this) - @InternalCoroutinesApi - override val coroutineContext: CoroutineContext - get() = context + // Domain + override val parent: Domain = parent ?: this @InternalCoroutinesApi - override val context: CoroutineContext = this + dispatcher + job + override val coroutineContext: CoroutineContext = this + CoroutineName(name) + dispatcher + job + + override fun toString(): String = path } /** @@ -248,81 +205,6 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : } /** - * Internal [ProcessRef] implementation for this simulation engine. - */ - private data class ProcessRefImpl( - private val owner: OmegaSimulationEngine, - override val name: String - ) : ProcessRef { - override fun toString(): String = "Process[$name]" - } - - /** - * Internal [Channel] implementation. - */ - private inner class ChannelImpl<T : Any> : Channel<T>, SendRef<T>, ReceiveRef<T> { - override val send: SendRef<T> = this - override val receive: ReceiveRef<T> = this - - /** - * The underlying `kotlinx.coroutines` channel to back this channel implementation. - */ - private val channel = KChannel<T>(KChannel.UNLIMITED) - - val onReceive: SelectClause1<T> - get() = channel.onReceive - - /** - * Receive a message from this channel. - */ - suspend fun receive(): T = channel.receive() - - /** - * Send a message to this channel. - */ - fun send(message: T) = assert(channel.offer(message)) { "Failed to send message" } - } - - private inner class SendPortImpl<T : Any>(private val channelImpl: ChannelImpl<T>) : SendPort<T> { - private var closed = false - - override fun close(): Boolean { - if (closed) { - return false - } - - closed = true - return true - } - - override fun send(message: T) { - check(!closed) { "Port is closed" } - schedule(Event.Send(clock.time, nextId++, channelImpl, message)) - } - } - - private class ReceivePortImpl<T : Any>(private val channel: ChannelImpl<T>) : ReceivePort<T> { - private var closed = false - - override fun close(): Boolean { - if (closed) { - return false - } - - closed = true - return true - } - - override val onReceive: SelectClause1<T> - get() = channel.onReceive - - override suspend fun receive(): T { - check(!closed) { "Port is closed" } - return channel.receive() - } - } - - /** * A wrapper around a message that has been scheduled for processing. * * @property time The point in time to deliver the message. @@ -361,19 +243,13 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : override fun toString(): String = "Timeout[$time]" } - - class Send<T : Any>(time: Long, id: Long, val channel: ChannelImpl<T>, val message: T) : Event(time, id) { - override fun run() { - channel.send(message) - } - } } /** * A virtual [Clock] implementation for keeping track of simulation time. */ private data class VirtualClock(var time: Long) : Clock() { - override fun withZone(zone: ZoneId?): Clock = TODO("not implemented") + override fun withZone(zone: ZoneId?): Clock = throw NotImplementedError() override fun getZone(): ZoneId = ZoneId.systemDefault() diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt index 75bb2265..5dba3233 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt @@ -24,7 +24,6 @@ package com.atlarge.odcsim.engine.omega -import com.atlarge.odcsim.Behavior import com.atlarge.odcsim.SimulationEngine import com.atlarge.odcsim.SimulationEngineProvider import java.util.ServiceLoader @@ -33,7 +32,6 @@ import java.util.ServiceLoader * An [SimulationEngineProvider] for the Omega engine, used by the [ServiceLoader] API to create * [OmegaSimulationEngine] instances. */ -class OmegaSimulationEngineProvider : SimulationEngineProvider { - override operator fun invoke(root: Behavior, name: String): SimulationEngine = - OmegaSimulationEngine(root, name) +public class OmegaSimulationEngineProvider : SimulationEngineProvider { + override operator fun invoke(name: String): SimulationEngine = OmegaSimulationEngine(name) } diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt index 5db989e8..fca4826e 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt @@ -24,7 +24,7 @@ package com.atlarge.odcsim.engine.omega.logging -import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SimulationContext import org.slf4j.Logger import org.slf4j.Marker import org.slf4j.helpers.MessageFormatter @@ -33,11 +33,11 @@ import org.slf4j.spi.LocationAwareLogger /** * An actor-specific [Logger] implementation that is aware of the calling location. * - * @param ctx The owning [ProcessContext] of this logger. + * @param ctx The owning [SimulationContext] of this logger. * @param delegate The [LocationAwareLogger] to delegate the messages to. */ internal class LocationAwareLoggerImpl( - ctx: ProcessContext, + ctx: SimulationContext, private val delegate: LocationAwareLogger ) : LoggerImpl(ctx), Logger by delegate { /** diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt index b77e85e7..856cecfa 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt @@ -24,18 +24,18 @@ package com.atlarge.odcsim.engine.omega.logging -import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SimulationContext import org.slf4j.Logger import org.slf4j.Marker /** * A [Logger] implementation that is not aware of the calling location. * - * @param ctx The owning [ProcessContext] of this logger. + * @param ctx The owning [SimulationContext] of this logger. * @param delegate The [Logger] to delegate the messages to. */ internal class LocationIgnorantLoggerImpl( - ctx: ProcessContext, + ctx: SimulationContext, private val delegate: Logger ) : LoggerImpl(ctx), Logger by delegate { override fun warn(marker: Marker?, format: String?, arg1: Any?, arg2: Any?) { diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt index 5a52473b..1adcfdc0 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt @@ -24,7 +24,7 @@ package com.atlarge.odcsim.engine.omega.logging -import com.atlarge.odcsim.ProcessContext +import com.atlarge.odcsim.SimulationContext import org.slf4j.Logger import org.slf4j.LoggerFactory import org.slf4j.MDC @@ -33,14 +33,14 @@ import org.slf4j.spi.LocationAwareLogger /** * An actor-specific [Logger] implementation. * - * @param ctx The owning [ProcessContext] of this logger. + * @param ctx The owning [SimulationContext] of this logger. */ -internal abstract class LoggerImpl internal constructor(protected val ctx: ProcessContext) : Logger { +internal abstract class LoggerImpl internal constructor(protected val ctx: SimulationContext) : Logger { /** * Configure [MDC] with actor-specific information. */ protected inline fun withMdc(block: () -> Unit) { - MDC.put(MDC_PROCESS_REF, ctx.self.name) + MDC.put(MDC_PROCESS_REF, ctx.domain.name) MDC.put(MDC_PROCESS_TIME, String.format("%d", ctx.clock.millis())) try { block() @@ -62,7 +62,7 @@ internal abstract class LoggerImpl internal constructor(protected val ctx: Proce * * @param ctx The actor context to create the logger for. */ - operator fun invoke(ctx: ProcessContext): Logger { + operator fun invoke(ctx: SimulationContext): Logger { val logger = LoggerFactory.getLogger(ctx.javaClass) return if (logger is LocationAwareLogger) { LocationAwareLoggerImpl(ctx, logger) |
