summaryrefslogtreecommitdiff
path: root/odcsim
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-02-16 20:30:17 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-02-28 14:28:29 +0100
commitb82e573c67f0004945aa18c575268100fb279b56 (patch)
tree7af8e303ea9ab3821e8d8c7c1c7e9b1de058a9e7 /odcsim
parent0c19b32433e2086e72e0d22595f4daa6ef04b64b (diff)
refactor: Change from logical processes to simulation domains
This change moves the simulator terminology from logical processes to simulation domains. This prevents the clash with "processes" that we are trying to simulate. In addition, simulation domains allows us to reduce the amount of boilerplate and instead allows for simulation modelled using standard techniques.
Diffstat (limited to 'odcsim')
-rw-r--r--odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Behavior.kt30
-rw-r--r--odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channel.kt70
-rw-r--r--odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt52
-rw-r--r--odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Domain.kt (renamed from odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/ProcessRef.kt)27
-rw-r--r--odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt71
-rw-r--r--odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt (renamed from odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/ProcessContext.kt)44
-rw-r--r--odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngine.kt10
-rw-r--r--odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngineProvider.kt7
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt236
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt6
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt6
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt6
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt10
13 files changed, 108 insertions, 467 deletions
diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Behavior.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Behavior.kt
deleted file mode 100644
index 1116c447..00000000
--- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Behavior.kt
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.odcsim
-
-/**
- * The behavior of a logical process defines how the process operates within its environments, represented as coroutine.
- */
-public typealias Behavior = suspend (ctx: ProcessContext) -> Unit
diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channel.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channel.kt
deleted file mode 100644
index 10b4908a..00000000
--- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channel.kt
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2019 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.odcsim
-
-import java.io.Serializable
-
-/**
- * A unidirectional communication medium using message passing. Processes may send messages over a channel, and
- * another process is able to receive messages sent over a channel it has a reference to (in form of a [ReceivePort]).
- *
- * Channels are represented by their send and receive endpoints at which processes may respectively send and receive
- * messages from this channel.
- *
- * Channels and their respective send and receive references may be shared freely between logical processes in the same
- * simulation.
- */
-public interface Channel<T : Any> : Serializable {
- /**
- * The endpoint of the channel processes may use to send messages to.
- */
- public val send: SendRef<T>
-
- /**
- * The endpoint of the channel processes may receive messages from.
- */
- public val receive: ReceiveRef<T>
-
- /**
- * Obtain the send endpoint of the channel when unpacking the channel. See [send].
- */
- public operator fun component1(): SendRef<T> = send
-
- /**
- * Obtain the receive endpoint of the channel when unpacking the channel. See [receive].
- */
- public operator fun component2(): ReceiveRef<T> = receive
-}
-
-/**
- * An opaque object representing a [Channel] endpoint through which logical processes can send messages over the
- * channel.
- */
-public interface SendRef<in T : Any> : Serializable
-
-/**
- * An opaque object representing the receive endpoint of a [Channel].
- */
-public interface ReceiveRef<out T : Any> : Serializable
diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt
deleted file mode 100644
index b15ce3ae..00000000
--- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.odcsim
-
-suspend fun <T : Any, U : Any> SendRef<T>.ask(block: (SendRef<U>) -> T): U {
- val ctx = processContext
- val outlet = ctx.connect(this)
- val channel = ctx.open<U>()
- try {
- outlet.send(block(channel.send))
- } finally {
- outlet.close()
- }
-
- val inlet = ctx.listen(channel.receive)
- try {
- return inlet.receive()
- } finally {
- inlet.close()
- }
-}
-
-suspend fun <T : Any> SendRef<T>.sendOnce(msg: T) {
- val outlet = processContext.connect(this)
- try {
- outlet.send(msg)
- } finally {
- outlet.close()
- }
-}
diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/ProcessRef.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Domain.kt
index 833458e4..c850952c 100644
--- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/ProcessRef.kt
+++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Domain.kt
@@ -24,22 +24,31 @@
package com.atlarge.odcsim
-import java.io.Serializable
+import kotlinx.coroutines.CoroutineScope
/**
- * A reference to a logical process in simulation.
+ * An isolated execution unit that runs concurrently in simulation to the other simulation domains. A domain defines a
+ * logical boundary between processes in simulation.
*/
-public interface ProcessRef : Comparable<ProcessRef>, Serializable {
+public interface Domain : CoroutineScope {
/**
- * The name of the process.
+ * The name of this domain.
*/
public val name: String
/**
- * Compare [other] process ref with this process reference for order.
- *
- * @return a negative integer, zero, or a positive integer as this object is less than, equal to, or greater
- * than the specified path.
+ * The parent domain to which the lifecycle of this domain is bound. In case this is a root domain, this refers to
+ * itself.
*/
- override fun compareTo(other: ProcessRef): Int = name.compareTo(other.name)
+ public val parent: Domain
+
+ /**
+ * Construct an anonymous simulation sub-domain that is bound to the lifecycle of this domain.
+ */
+ public fun newDomain(): Domain
+
+ /**
+ * Construct a new simulation sub-domain with the specified [name].
+ */
+ public fun newDomain(name: String): Domain
}
diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt
deleted file mode 100644
index 7c730866..00000000
--- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2019 atlarge-research
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package com.atlarge.odcsim
-
-import kotlinx.coroutines.selects.SelectClause1
-
-/**
- * A communication endpoint of a specific logical process through which messages pass.
- *
- * Ports are tied to a specific logical process and may not be shared with other processes. Doing so results in
- * undefined behavior and might cause violation of time-consistency between processes.
- */
-public interface Port {
- /**
- * Close this communication endpoint. This informs the process(es) at the other end of the port that the caller
- * will not send or receive messages via this port.
- *
- * This is an idempotent operation – subsequent invocations of this function have no effect and return `false`.
- */
- fun close(): Boolean
-}
-
-/**
- * A [Port] through which a logical process may receive messages from other [SendPort]s.
- */
-public interface ReceivePort<out T : Any> : Port {
- /**
- * Receive a message send to this port or suspend the caller while no messages have been received at this port yet.
- */
- public suspend fun receive(): T
-
- /**
- * Clause for select expression for receiving a message from the channel.
- */
- val onReceive: SelectClause1<T>
-}
-
-/**
- * A [Port] through which logical processes may send messages to a [ReceivePort].
- */
-public interface SendPort<in T : Any> : Port {
- /**
- * Send a message via this port to the process(es) listening at the other end of the port.
- *
- * Messages are send asynchronously to the receivers and do not suspend the caller. This method guarantees
- * exactly-once delivery while respecting time-consistency between owner of the send port and its receivers.
- */
- public fun send(message: T)
-}
diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/ProcessContext.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt
index 30ef4114..c51d1d8b 100644
--- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/ProcessContext.kt
+++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationContext.kt
@@ -27,22 +27,21 @@ package com.atlarge.odcsim
import java.time.Clock
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
-import kotlinx.coroutines.CoroutineScope
import org.slf4j.Logger
/**
- * Represents the execution context of a logical process in simulation.
+ * Represents the execution context of a simulation domain.
*/
-public interface ProcessContext : CoroutineContext.Element, CoroutineScope {
+public interface SimulationContext : CoroutineContext.Element {
/**
- * Key for [ProcessContext] instance in the coroutine context.
+ * Key for [SimulationContext] instance in the coroutine context.
*/
- companion object Key : CoroutineContext.Key<ProcessContext>
+ companion object Key : CoroutineContext.Key<SimulationContext>
/**
- * The reference to the logical process of this context.
+ * The reference to the current simulation domain.
*/
- public val self: ProcessRef
+ public val domain: Domain
/**
* The clock tracking the simulation time.
@@ -53,37 +52,12 @@ public interface ProcessContext : CoroutineContext.Element, CoroutineScope {
* A logger instance tied to the logical process.
*/
public val log: Logger
-
- /**
- * Spawn an anonymous logical process in the simulation universe with the specified [behavior].
- */
- public fun spawn(behavior: Behavior): ProcessRef
-
- /**
- * Spawn a logical process in the simulation universe with the specified [behavior] and [name].
- */
- public fun spawn(behavior: Behavior, name: String): ProcessRef
-
- /**
- * Open a new communication [Channel] for messages of type [T].
- */
- public fun <T : Any> open(): Channel<T>
-
- /**
- * Create a [SendPort] for sending messages to the specified [send].
- */
- public suspend fun <T : Any> connect(send: SendRef<T>): SendPort<T>
-
- /**
- * Create a [ReceivePort] for listening to the messages sent to the specified [receive] endpoint of a channel.
- */
- public suspend fun <T : Any> listen(receive: ReceiveRef<T>): ReceivePort<T>
}
/**
- * The process context of the current coroutine.
+ * The simulation context of the current coroutine.
*/
@Suppress("WRONG_MODIFIER_TARGET")
-public suspend inline val processContext: ProcessContext
+public suspend inline val simulationContext: SimulationContext
@Suppress("ILLEGAL_SUSPEND_PROPERTY_ACCESS")
- get() = coroutineContext[ProcessContext.Key] ?: throw IllegalStateException("No process context active")
+ get() = coroutineContext[SimulationContext] ?: throw IllegalStateException("No simulation context available")
diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngine.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngine.kt
index e35acbf0..db05cb1d 100644
--- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngine.kt
+++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngine.kt
@@ -37,6 +37,16 @@ public interface SimulationEngine {
public val name: String
/**
+ * Construct an anonymous root simulation domain.
+ */
+ public fun newDomain(): Domain
+
+ /**
+ * Construct a new root simulation domain with the specified [name].
+ */
+ public fun newDomain(name: String): Domain
+
+ /**
* Run the simulation.
*/
public suspend fun run()
diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngineProvider.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngineProvider.kt
index 93dda963..a975fa3c 100644
--- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngineProvider.kt
+++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/SimulationEngineProvider.kt
@@ -29,10 +29,7 @@ package com.atlarge.odcsim
*/
public interface SimulationEngineProvider {
/**
- * Create an [SimulationEngine] with the given root [Behavior] and the given name.
- *
- * @param root The behavior of the root process.
- * @param name The name of the engine instance.
+ * Construct an [SimulationEngine] with the given [name].
*/
- public operator fun invoke(root: Behavior, name: String): SimulationEngine
+ public operator fun invoke(name: String): SimulationEngine
}
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
index 207d2768..4edf94d2 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
@@ -24,14 +24,8 @@
package com.atlarge.odcsim.engine.omega
-import com.atlarge.odcsim.Behavior
-import com.atlarge.odcsim.Channel
-import com.atlarge.odcsim.ProcessContext
-import com.atlarge.odcsim.ProcessRef
-import com.atlarge.odcsim.ReceivePort
-import com.atlarge.odcsim.ReceiveRef
-import com.atlarge.odcsim.SendPort
-import com.atlarge.odcsim.SendRef
+import com.atlarge.odcsim.Domain
+import com.atlarge.odcsim.SimulationContext
import com.atlarge.odcsim.SimulationEngine
import com.atlarge.odcsim.engine.omega.logging.LoggerImpl
import java.time.Clock
@@ -39,21 +33,19 @@ import java.time.Instant
import java.time.ZoneId
import java.util.PriorityQueue
import java.util.UUID
-import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
-import kotlin.coroutines.startCoroutine
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CoroutineDispatcher
+import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.Delay
import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.InternalCoroutinesApi
+import kotlinx.coroutines.Job
import kotlinx.coroutines.Runnable
import kotlinx.coroutines.SupervisorJob
-import kotlinx.coroutines.channels.Channel as KChannel
import kotlinx.coroutines.isActive
-import kotlinx.coroutines.selects.SelectClause1
import org.jetbrains.annotations.Async
import org.slf4j.Logger
@@ -63,10 +55,9 @@ import org.slf4j.Logger
* This engine implementation is a single-threaded implementation, running logical processes synchronously and
* provides a single priority queue for all events (messages, ticks, etc) that occur.
*
- * @param rootBehavior The behavior of the root actor.
* @param name The name of the engine instance.
*/
-class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : SimulationEngine {
+public class OmegaSimulationEngine(override val name: String) : SimulationEngine {
/**
* The state of the actor system.
*/
@@ -85,12 +76,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
/**
* The active processes in the simulation engine.
*/
- private val registry: MutableMap<ProcessRef, ProcessImpl> = HashMap()
-
- /**
- * The channels that have been registered by this engine.
- */
- private val channels: MutableSet<ChannelImpl<*>> = HashSet()
+ private val registry: MutableMap<String, Domain> = HashMap()
/**
* A unique increasing identifier assigned to each event, needed because otherwise two events occurring in sequence
@@ -99,29 +85,9 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
*/
private var nextId: Long = 0
- /**
- * The [CoroutineDispatcher] instance for dispatching the coroutines representing the logical behavior.
- */
- @InternalCoroutinesApi
- private val dispatcher: CoroutineDispatcher = object : CoroutineDispatcher(), Delay {
- override fun dispatch(context: CoroutineContext, block: Runnable) {
- schedule(Event.Dispatch(clock.time, nextId++, block))
- }
+ override fun newDomain(): Domain = newDomain(null)
- override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
- schedule(Event.Resume(clock.time + timeMillis, nextId++, this, continuation))
- }
-
- override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
- val event = Event.Timeout(clock.time + timeMillis, nextId++, block)
- schedule(event)
- return event
- }
- }
-
- init {
- spawn(rootBehavior, "/")
- }
+ override fun newDomain(name: String): Domain = newDomain(name, null)
override suspend fun run() {
check(state != SimulationEngineState.TERMINATED) { "The simulation engine is terminated" }
@@ -164,80 +130,71 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
}
/**
- * Spawn a new logical process in the simulation universe.
+ * Spawn a new simulation domain.
*/
- private fun spawn(behavior: Behavior, name: String): ProcessRef {
- val ref = ProcessRefImpl(this, name)
- require(ref !in registry) { "Process name $name not unique" }
- val lp = ProcessImpl(ref, behavior)
- registry[ref] = lp
- lp.start()
- return ref
+ private fun newDomainImpl(name: String, parent: DomainImpl?): Domain {
+ val domain = DomainImpl(name, parent)
+ require(domain.path !in registry) { "Domain name $name not unique" }
+ registry[domain.path] = domain
+ return domain
}
- /**
- * Register a new communication channel.
- */
- private fun <T : Any> open(): Channel<T> {
- val channel = ChannelImpl<T>()
- channels += channel
- return channel
+ private fun newDomain(parent: DomainImpl?): Domain {
+ val name = "$" + UUID.randomUUID()
+ return newDomainImpl(name, null)
+ }
+
+ private fun newDomain(name: String, parent: DomainImpl?): Domain {
+ require(name.isNotEmpty()) { "Domain name may not be empty" }
+ require(!name.startsWith("$")) { "Domain name may not start with $-sign" }
+ require(!name.contains("/")) { "Domain name may not contain /" }
+ return newDomainImpl(name, parent)
}
- private inner class ProcessImpl(override val self: ProcessRef, private val behavior: Behavior) : ProcessContext, Continuation<Unit> {
- val job = SupervisorJob()
+ private inner class DomainImpl(override val name: String, parent: DomainImpl?) : SimulationContext, Domain {
+ val job: Job = SupervisorJob(parent?.job)
+ val path: String = (parent?.path ?: "") + "/$name"
- override val clock: Clock
- get() = this@OmegaSimulationEngine.clock
+ @InternalCoroutinesApi
+ private val dispatcher = object : CoroutineDispatcher(), Delay {
+ // CoroutineDispatcher
+ override fun dispatch(context: CoroutineContext, block: Runnable) {
+ schedule(Event.Dispatch(clock.time, nextId++, block))
+ }
- override val log: Logger by lazy(LazyThreadSafetyMode.NONE) { LoggerImpl.invoke(this) }
+ // Delay
+ override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
+ schedule(Event.Resume(clock.time + timeMillis, nextId++, this, continuation))
+ }
- override fun spawn(behavior: Behavior): ProcessRef {
- val name = "$" + UUID.randomUUID()
- return this@OmegaSimulationEngine.spawn(behavior, name)
+ override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
+ val event = Event.Timeout(clock.time + timeMillis, nextId++, block)
+ schedule(event)
+ return event
+ }
}
- override fun spawn(behavior: Behavior, name: String): ProcessRef {
- require(name.isNotEmpty()) { "Process name may not be empty" }
- require(!name.startsWith("$")) { "Process name may not start with $-sign" }
- return this@OmegaSimulationEngine.spawn(behavior, name)
- }
+ // SimulationContext
+ override val key: CoroutineContext.Key<*> = SimulationContext.Key
- override fun <T : Any> open(): Channel<T> = this@OmegaSimulationEngine.open()
+ override val domain: Domain = this
- override suspend fun <T : Any> connect(send: SendRef<T>): SendPort<T> {
- require(send is ChannelImpl && send in channels) { "Invalid reference to channel" }
- return SendPortImpl(send)
- }
+ override val clock: VirtualClock
+ get() = this@OmegaSimulationEngine.clock
- override suspend fun <T : Any> listen(receive: ReceiveRef<T>): ReceivePort<T> {
- require(receive is ChannelImpl && receive in channels) { "Invalid reference to channel" }
- return ReceivePortImpl(receive)
- }
+ override val log: Logger by lazy(LazyThreadSafetyMode.NONE) { LoggerImpl.invoke(this) }
- /**
- * Start this logical process.
- */
- fun start() = behavior.startCoroutine(this, this)
-
- override fun resumeWith(result: Result<Unit>) {
- // Stop the logical process
- if (result.isFailure) {
- result.exceptionOrNull()!!.printStackTrace()
- job.completeExceptionally(result.exceptionOrNull()!!)
- } else {
- job.complete()
- }
- }
+ override fun newDomain(): Domain = this@OmegaSimulationEngine.newDomain(this)
- override val key: CoroutineContext.Key<*> = ProcessContext.Key
+ override fun newDomain(name: String): Domain = this@OmegaSimulationEngine.newDomain(name, this)
- @InternalCoroutinesApi
- override val coroutineContext: CoroutineContext
- get() = context
+ // Domain
+ override val parent: Domain = parent ?: this
@InternalCoroutinesApi
- override val context: CoroutineContext = this + dispatcher + job
+ override val coroutineContext: CoroutineContext = this + CoroutineName(name) + dispatcher + job
+
+ override fun toString(): String = path
}
/**
@@ -248,81 +205,6 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
}
/**
- * Internal [ProcessRef] implementation for this simulation engine.
- */
- private data class ProcessRefImpl(
- private val owner: OmegaSimulationEngine,
- override val name: String
- ) : ProcessRef {
- override fun toString(): String = "Process[$name]"
- }
-
- /**
- * Internal [Channel] implementation.
- */
- private inner class ChannelImpl<T : Any> : Channel<T>, SendRef<T>, ReceiveRef<T> {
- override val send: SendRef<T> = this
- override val receive: ReceiveRef<T> = this
-
- /**
- * The underlying `kotlinx.coroutines` channel to back this channel implementation.
- */
- private val channel = KChannel<T>(KChannel.UNLIMITED)
-
- val onReceive: SelectClause1<T>
- get() = channel.onReceive
-
- /**
- * Receive a message from this channel.
- */
- suspend fun receive(): T = channel.receive()
-
- /**
- * Send a message to this channel.
- */
- fun send(message: T) = assert(channel.offer(message)) { "Failed to send message" }
- }
-
- private inner class SendPortImpl<T : Any>(private val channelImpl: ChannelImpl<T>) : SendPort<T> {
- private var closed = false
-
- override fun close(): Boolean {
- if (closed) {
- return false
- }
-
- closed = true
- return true
- }
-
- override fun send(message: T) {
- check(!closed) { "Port is closed" }
- schedule(Event.Send(clock.time, nextId++, channelImpl, message))
- }
- }
-
- private class ReceivePortImpl<T : Any>(private val channel: ChannelImpl<T>) : ReceivePort<T> {
- private var closed = false
-
- override fun close(): Boolean {
- if (closed) {
- return false
- }
-
- closed = true
- return true
- }
-
- override val onReceive: SelectClause1<T>
- get() = channel.onReceive
-
- override suspend fun receive(): T {
- check(!closed) { "Port is closed" }
- return channel.receive()
- }
- }
-
- /**
* A wrapper around a message that has been scheduled for processing.
*
* @property time The point in time to deliver the message.
@@ -361,19 +243,13 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
override fun toString(): String = "Timeout[$time]"
}
-
- class Send<T : Any>(time: Long, id: Long, val channel: ChannelImpl<T>, val message: T) : Event(time, id) {
- override fun run() {
- channel.send(message)
- }
- }
}
/**
* A virtual [Clock] implementation for keeping track of simulation time.
*/
private data class VirtualClock(var time: Long) : Clock() {
- override fun withZone(zone: ZoneId?): Clock = TODO("not implemented")
+ override fun withZone(zone: ZoneId?): Clock = throw NotImplementedError()
override fun getZone(): ZoneId = ZoneId.systemDefault()
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt
index 75bb2265..5dba3233 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngineProvider.kt
@@ -24,7 +24,6 @@
package com.atlarge.odcsim.engine.omega
-import com.atlarge.odcsim.Behavior
import com.atlarge.odcsim.SimulationEngine
import com.atlarge.odcsim.SimulationEngineProvider
import java.util.ServiceLoader
@@ -33,7 +32,6 @@ import java.util.ServiceLoader
* An [SimulationEngineProvider] for the Omega engine, used by the [ServiceLoader] API to create
* [OmegaSimulationEngine] instances.
*/
-class OmegaSimulationEngineProvider : SimulationEngineProvider {
- override operator fun invoke(root: Behavior, name: String): SimulationEngine =
- OmegaSimulationEngine(root, name)
+public class OmegaSimulationEngineProvider : SimulationEngineProvider {
+ override operator fun invoke(name: String): SimulationEngine = OmegaSimulationEngine(name)
}
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt
index 5db989e8..fca4826e 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationAwareLoggerImpl.kt
@@ -24,7 +24,7 @@
package com.atlarge.odcsim.engine.omega.logging
-import com.atlarge.odcsim.ProcessContext
+import com.atlarge.odcsim.SimulationContext
import org.slf4j.Logger
import org.slf4j.Marker
import org.slf4j.helpers.MessageFormatter
@@ -33,11 +33,11 @@ import org.slf4j.spi.LocationAwareLogger
/**
* An actor-specific [Logger] implementation that is aware of the calling location.
*
- * @param ctx The owning [ProcessContext] of this logger.
+ * @param ctx The owning [SimulationContext] of this logger.
* @param delegate The [LocationAwareLogger] to delegate the messages to.
*/
internal class LocationAwareLoggerImpl(
- ctx: ProcessContext,
+ ctx: SimulationContext,
private val delegate: LocationAwareLogger
) : LoggerImpl(ctx), Logger by delegate {
/**
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt
index b77e85e7..856cecfa 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LocationIgnorantLoggerImpl.kt
@@ -24,18 +24,18 @@
package com.atlarge.odcsim.engine.omega.logging
-import com.atlarge.odcsim.ProcessContext
+import com.atlarge.odcsim.SimulationContext
import org.slf4j.Logger
import org.slf4j.Marker
/**
* A [Logger] implementation that is not aware of the calling location.
*
- * @param ctx The owning [ProcessContext] of this logger.
+ * @param ctx The owning [SimulationContext] of this logger.
* @param delegate The [Logger] to delegate the messages to.
*/
internal class LocationIgnorantLoggerImpl(
- ctx: ProcessContext,
+ ctx: SimulationContext,
private val delegate: Logger
) : LoggerImpl(ctx), Logger by delegate {
override fun warn(marker: Marker?, format: String?, arg1: Any?, arg2: Any?) {
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt
index 5a52473b..1adcfdc0 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/logging/LoggerImpl.kt
@@ -24,7 +24,7 @@
package com.atlarge.odcsim.engine.omega.logging
-import com.atlarge.odcsim.ProcessContext
+import com.atlarge.odcsim.SimulationContext
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.MDC
@@ -33,14 +33,14 @@ import org.slf4j.spi.LocationAwareLogger
/**
* An actor-specific [Logger] implementation.
*
- * @param ctx The owning [ProcessContext] of this logger.
+ * @param ctx The owning [SimulationContext] of this logger.
*/
-internal abstract class LoggerImpl internal constructor(protected val ctx: ProcessContext) : Logger {
+internal abstract class LoggerImpl internal constructor(protected val ctx: SimulationContext) : Logger {
/**
* Configure [MDC] with actor-specific information.
*/
protected inline fun withMdc(block: () -> Unit) {
- MDC.put(MDC_PROCESS_REF, ctx.self.name)
+ MDC.put(MDC_PROCESS_REF, ctx.domain.name)
MDC.put(MDC_PROCESS_TIME, String.format("%d", ctx.clock.millis()))
try {
block()
@@ -62,7 +62,7 @@ internal abstract class LoggerImpl internal constructor(protected val ctx: Proce
*
* @param ctx The actor context to create the logger for.
*/
- operator fun invoke(ctx: ProcessContext): Logger {
+ operator fun invoke(ctx: SimulationContext): Logger {
val logger = LoggerFactory.getLogger(ctx.javaClass)
return if (logger is LocationAwareLogger) {
LocationAwareLoggerImpl(ctx, logger)