diff options
Diffstat (limited to 'odcsim')
| -rw-r--r-- | odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt | 52 | ||||
| -rw-r--r-- | odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt | 47 |
2 files changed, 84 insertions, 15 deletions
diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt new file mode 100644 index 00000000..b15ce3ae --- /dev/null +++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt @@ -0,0 +1,52 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim + +suspend fun <T : Any, U : Any> SendRef<T>.ask(block: (SendRef<U>) -> T): U { + val ctx = processContext + val outlet = ctx.connect(this) + val channel = ctx.open<U>() + try { + outlet.send(block(channel.send)) + } finally { + outlet.close() + } + + val inlet = ctx.listen(channel.receive) + try { + return inlet.receive() + } finally { + inlet.close() + } +} + +suspend fun <T : Any> SendRef<T>.sendOnce(msg: T) { + val outlet = processContext.connect(this) + try { + outlet.send(msg) + } finally { + outlet.close() + } +} diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt index 34e5fd9a..11dae528 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt @@ -46,8 +46,10 @@ import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.Delay import kotlinx.coroutines.DisposableHandle +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.Runnable +import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.channels.Channel as KChannel import kotlinx.coroutines.isActive import kotlinx.coroutines.selects.SelectClause1 @@ -89,20 +91,27 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : private val channels: MutableSet<ChannelImpl<*>> = HashSet() /** + * A unique increasing identifier assigned to each event, needed because otherwise two events occurring in sequence + * (but at the same time) may be differently ordered in the internal priority queue (queue) since it does not + * guarantee insertion order. + */ + private var nextId: Long = 0 + + /** * The [CoroutineDispatcher] instance for dispatching the coroutines representing the logical behavior. */ @InternalCoroutinesApi private val dispatcher: CoroutineDispatcher = object : CoroutineDispatcher(), Delay { override fun dispatch(context: CoroutineContext, block: Runnable) { - schedule(Event.Dispatch(clock.time, block)) + schedule(Event.Dispatch(clock.time, nextId++, block)) } override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { - schedule(Event.Resume(clock.time + timeMillis, this, continuation)) + schedule(Event.Resume(clock.time + timeMillis, nextId++, this, continuation)) } override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { - val event = Event.Timeout(clock.time + timeMillis, block) + val event = Event.Timeout(clock.time + timeMillis, nextId++, block) schedule(event) return event } @@ -174,12 +183,14 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : } private inner class ProcessImpl(override val self: ProcessRef, private val behavior: Behavior) : ProcessContext, Continuation<Unit> { + val job = SupervisorJob() + override val clock: Clock get() = this@OmegaSimulationEngine.clock override fun spawn(behavior: Behavior): ProcessRef { val name = "$" + UUID.randomUUID() - return spawn(behavior, name) + return this@OmegaSimulationEngine.spawn(behavior, name) } override fun spawn(behavior: Behavior, name: String): ProcessRef { @@ -209,13 +220,16 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : // Stop the logical process if (result.isFailure) { result.exceptionOrNull()!!.printStackTrace() + job.completeExceptionally(result.exceptionOrNull()!!) + } else { + job.complete() } } override val key: CoroutineContext.Key<*> = ProcessContext.Key @InternalCoroutinesApi - override val context: CoroutineContext = this + dispatcher + override val context: CoroutineContext = this + dispatcher + job } /** @@ -245,7 +259,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : /** * The underlying `kotlinx.coroutines` channel to back this channel implementation. */ - private val channel = KChannel<T>(KChannel.CONFLATED) + private val channel = KChannel<T>(KChannel.UNLIMITED) val onReceive: SelectClause1<T> get() = channel.onReceive @@ -275,7 +289,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : override fun send(message: T) { check(!closed) { "Port is closed" } - schedule(Event.Send(clock.time, channelImpl, message)) + schedule(Event.Send(clock.time, nextId++, channelImpl, message)) } } @@ -305,17 +319,20 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : * * @property time The point in time to deliver the message. */ - private sealed class Event(val time: Long) : Comparable<Event>, Runnable { - override fun compareTo(other: Event): Int = time.compareTo(other.time) + private sealed class Event(val time: Long, val id: Long) : Comparable<Event>, Runnable { + override fun compareTo(other: Event): Int { + val cmp = time.compareTo(other.time) + return if (cmp == 0) id.compareTo(other.id) else cmp + } - class Dispatch(time: Long, val block: Runnable) : Event(time) { + class Dispatch(time: Long, id: Long, val block: Runnable) : Event(time, id) { override fun run() = block.run() override fun toString(): String = "Dispatch[$time]" } - class Resume(time: Long, val dispatcher: CoroutineDispatcher, val continuation: CancellableContinuation<Unit>) : Event(time) { - @InternalCoroutinesApi + class Resume(time: Long, id: Long, val dispatcher: CoroutineDispatcher, val continuation: CancellableContinuation<Unit>) : Event(time, id) { + @ExperimentalCoroutinesApi override fun run() { with(continuation) { dispatcher.resumeUndispatched(Unit) } } @@ -323,7 +340,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : override fun toString(): String = "Resume[$time]" } - class Timeout(time: Long, val block: Runnable, var cancelled: Boolean = false) : Event(time), DisposableHandle { + class Timeout(time: Long, id: Long, val block: Runnable, var cancelled: Boolean = false) : Event(time, id), DisposableHandle { override fun run() { if (!cancelled) { block.run() @@ -334,10 +351,10 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : cancelled = true } - override fun toString(): String = "Dispatch[$time]" + override fun toString(): String = "Timeout[$time]" } - class Send<T : Any>(time: Long, val channel: ChannelImpl<T>, val message: T) : Event(time) { + class Send<T : Any>(time: Long, id: Long, val channel: ChannelImpl<T>, val message: T) : Event(time, id) { override fun run() { channel.send(message) } |
