From 1e6d752d2186987e27059eececf951f68ce98977 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 11 Feb 2020 13:45:19 +0100 Subject: bug: Guarantee FIFO order of messages in the queue This change fixes the issue where messages are not delivered in FIFO order due to the internal priority not guaranteeing insertion order. For now, we fix this issue by adding a unique increasing identifier to each event in the queue. --- .../odcsim/engine/omega/OmegaSimulationEngine.kt | 42 ++++++++++++++-------- 1 file changed, 27 insertions(+), 15 deletions(-) (limited to 'odcsim') diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt index 34e5fd9a..767e139a 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt @@ -46,8 +46,10 @@ import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.Delay import kotlinx.coroutines.DisposableHandle +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.Runnable +import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.channels.Channel as KChannel import kotlinx.coroutines.isActive import kotlinx.coroutines.selects.SelectClause1 @@ -88,21 +90,23 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : */ private val channels: MutableSet> = HashSet() + private var nextId: Long = 0 + /** * The [CoroutineDispatcher] instance for dispatching the coroutines representing the logical behavior. */ @InternalCoroutinesApi private val dispatcher: CoroutineDispatcher = object : CoroutineDispatcher(), Delay { override fun dispatch(context: CoroutineContext, block: Runnable) { - schedule(Event.Dispatch(clock.time, block)) + schedule(Event.Dispatch(clock.time, nextId++, block)) } override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { - schedule(Event.Resume(clock.time + timeMillis, this, continuation)) + schedule(Event.Resume(clock.time + timeMillis, nextId++, this, continuation)) } override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { - val event = Event.Timeout(clock.time + timeMillis, block) + val event = Event.Timeout(clock.time + timeMillis, nextId++, block) schedule(event) return event } @@ -174,12 +178,14 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : } private inner class ProcessImpl(override val self: ProcessRef, private val behavior: Behavior) : ProcessContext, Continuation { + val job = SupervisorJob() + override val clock: Clock get() = this@OmegaSimulationEngine.clock override fun spawn(behavior: Behavior): ProcessRef { val name = "$" + UUID.randomUUID() - return spawn(behavior, name) + return this@OmegaSimulationEngine.spawn(behavior, name) } override fun spawn(behavior: Behavior, name: String): ProcessRef { @@ -209,13 +215,16 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : // Stop the logical process if (result.isFailure) { result.exceptionOrNull()!!.printStackTrace() + job.completeExceptionally(result.exceptionOrNull()!!) + } else { + job.complete() } } override val key: CoroutineContext.Key<*> = ProcessContext.Key @InternalCoroutinesApi - override val context: CoroutineContext = this + dispatcher + override val context: CoroutineContext = this + dispatcher + job } /** @@ -245,7 +254,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : /** * The underlying `kotlinx.coroutines` channel to back this channel implementation. */ - private val channel = KChannel(KChannel.CONFLATED) + private val channel = KChannel(KChannel.UNLIMITED) val onReceive: SelectClause1 get() = channel.onReceive @@ -275,7 +284,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : override fun send(message: T) { check(!closed) { "Port is closed" } - schedule(Event.Send(clock.time, channelImpl, message)) + schedule(Event.Send(clock.time, nextId++, channelImpl, message)) } } @@ -305,17 +314,20 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : * * @property time The point in time to deliver the message. */ - private sealed class Event(val time: Long) : Comparable, Runnable { - override fun compareTo(other: Event): Int = time.compareTo(other.time) + private sealed class Event(val time: Long, val id: Long) : Comparable, Runnable { + override fun compareTo(other: Event): Int { + val cmp = time.compareTo(other.time) + return if (cmp == 0) id.compareTo(other.id) else cmp + } - class Dispatch(time: Long, val block: Runnable) : Event(time) { + class Dispatch(time: Long, id: Long, val block: Runnable) : Event(time, id) { override fun run() = block.run() override fun toString(): String = "Dispatch[$time]" } - class Resume(time: Long, val dispatcher: CoroutineDispatcher, val continuation: CancellableContinuation) : Event(time) { - @InternalCoroutinesApi + class Resume(time: Long, id: Long, val dispatcher: CoroutineDispatcher, val continuation: CancellableContinuation) : Event(time, id) { + @ExperimentalCoroutinesApi override fun run() { with(continuation) { dispatcher.resumeUndispatched(Unit) } } @@ -323,7 +335,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : override fun toString(): String = "Resume[$time]" } - class Timeout(time: Long, val block: Runnable, var cancelled: Boolean = false) : Event(time), DisposableHandle { + class Timeout(time: Long, id: Long, val block: Runnable, var cancelled: Boolean = false) : Event(time, id), DisposableHandle { override fun run() { if (!cancelled) { block.run() @@ -334,10 +346,10 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : cancelled = true } - override fun toString(): String = "Dispatch[$time]" + override fun toString(): String = "Timeout[$time]" } - class Send(time: Long, val channel: ChannelImpl, val message: T) : Event(time) { + class Send(time: Long, id: Long, val channel: ChannelImpl, val message: T) : Event(time, id) { override fun run() { channel.send(message) } -- cgit v1.2.3 From 5de6ec076fa8bc19c34449bcc085dca184d2e17f Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 11 Feb 2020 13:50:57 +0100 Subject: feat: Add helper methods for req-res pattern This change adds helper methods for simplifying the request-response pattern commonly used in models. --- .../src/main/kotlin/com/atlarge/odcsim/Channels.kt | 52 ++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt (limited to 'odcsim') diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt new file mode 100644 index 00000000..b15ce3ae --- /dev/null +++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt @@ -0,0 +1,52 @@ +/* + * MIT License + * + * Copyright (c) 2020 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.odcsim + +suspend fun SendRef.ask(block: (SendRef) -> T): U { + val ctx = processContext + val outlet = ctx.connect(this) + val channel = ctx.open() + try { + outlet.send(block(channel.send)) + } finally { + outlet.close() + } + + val inlet = ctx.listen(channel.receive) + try { + return inlet.receive() + } finally { + inlet.close() + } +} + +suspend fun SendRef.sendOnce(msg: T) { + val outlet = processContext.connect(this) + try { + outlet.send(msg) + } finally { + outlet.close() + } +} -- cgit v1.2.3 From 56ff9a31c59f271fb5f40bb9d3bed9a6d5b48a6f Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 11 Feb 2020 14:23:00 +0100 Subject: docs: Elaborate on unique ids of events --- .../kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'odcsim') diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt index 767e139a..11dae528 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt @@ -90,6 +90,11 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : */ private val channels: MutableSet> = HashSet() + /** + * A unique increasing identifier assigned to each event, needed because otherwise two events occurring in sequence + * (but at the same time) may be differently ordered in the internal priority queue (queue) since it does not + * guarantee insertion order. + */ private var nextId: Long = 0 /** -- cgit v1.2.3