diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-02-11 13:45:19 +0100 |
|---|---|---|
| committer | Georgios Andreadis <info@gandreadis.com> | 2020-02-11 14:40:15 +0100 |
| commit | 1e6d752d2186987e27059eececf951f68ce98977 (patch) | |
| tree | 2b5b468ca4d810686e0cb1c780822e57a7519ce3 /odcsim | |
| parent | 65a91a92afd8b6e71f08f5cbe345af30606c4861 (diff) | |
bug: Guarantee FIFO order of messages in the queue
This change fixes the issue where messages are not delivered in FIFO
order due to the internal priority not guaranteeing insertion order. For
now, we fix this issue by adding a unique increasing identifier to each
event in the queue.
Diffstat (limited to 'odcsim')
| -rw-r--r-- | odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt | 42 |
1 files changed, 27 insertions, 15 deletions
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt index 34e5fd9a..767e139a 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt @@ -46,8 +46,10 @@ import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.Delay import kotlinx.coroutines.DisposableHandle +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.Runnable +import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.channels.Channel as KChannel import kotlinx.coroutines.isActive import kotlinx.coroutines.selects.SelectClause1 @@ -88,21 +90,23 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : */ private val channels: MutableSet<ChannelImpl<*>> = HashSet() + private var nextId: Long = 0 + /** * The [CoroutineDispatcher] instance for dispatching the coroutines representing the logical behavior. */ @InternalCoroutinesApi private val dispatcher: CoroutineDispatcher = object : CoroutineDispatcher(), Delay { override fun dispatch(context: CoroutineContext, block: Runnable) { - schedule(Event.Dispatch(clock.time, block)) + schedule(Event.Dispatch(clock.time, nextId++, block)) } override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { - schedule(Event.Resume(clock.time + timeMillis, this, continuation)) + schedule(Event.Resume(clock.time + timeMillis, nextId++, this, continuation)) } override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { - val event = Event.Timeout(clock.time + timeMillis, block) + val event = Event.Timeout(clock.time + timeMillis, nextId++, block) schedule(event) return event } @@ -174,12 +178,14 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : } private inner class ProcessImpl(override val self: ProcessRef, private val behavior: Behavior) : ProcessContext, Continuation<Unit> { + val job = SupervisorJob() + override val clock: Clock get() = this@OmegaSimulationEngine.clock override fun spawn(behavior: Behavior): ProcessRef { val name = "$" + UUID.randomUUID() - return spawn(behavior, name) + return this@OmegaSimulationEngine.spawn(behavior, name) } override fun spawn(behavior: Behavior, name: String): ProcessRef { @@ -209,13 +215,16 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : // Stop the logical process if (result.isFailure) { result.exceptionOrNull()!!.printStackTrace() + job.completeExceptionally(result.exceptionOrNull()!!) + } else { + job.complete() } } override val key: CoroutineContext.Key<*> = ProcessContext.Key @InternalCoroutinesApi - override val context: CoroutineContext = this + dispatcher + override val context: CoroutineContext = this + dispatcher + job } /** @@ -245,7 +254,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : /** * The underlying `kotlinx.coroutines` channel to back this channel implementation. */ - private val channel = KChannel<T>(KChannel.CONFLATED) + private val channel = KChannel<T>(KChannel.UNLIMITED) val onReceive: SelectClause1<T> get() = channel.onReceive @@ -275,7 +284,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : override fun send(message: T) { check(!closed) { "Port is closed" } - schedule(Event.Send(clock.time, channelImpl, message)) + schedule(Event.Send(clock.time, nextId++, channelImpl, message)) } } @@ -305,17 +314,20 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : * * @property time The point in time to deliver the message. */ - private sealed class Event(val time: Long) : Comparable<Event>, Runnable { - override fun compareTo(other: Event): Int = time.compareTo(other.time) + private sealed class Event(val time: Long, val id: Long) : Comparable<Event>, Runnable { + override fun compareTo(other: Event): Int { + val cmp = time.compareTo(other.time) + return if (cmp == 0) id.compareTo(other.id) else cmp + } - class Dispatch(time: Long, val block: Runnable) : Event(time) { + class Dispatch(time: Long, id: Long, val block: Runnable) : Event(time, id) { override fun run() = block.run() override fun toString(): String = "Dispatch[$time]" } - class Resume(time: Long, val dispatcher: CoroutineDispatcher, val continuation: CancellableContinuation<Unit>) : Event(time) { - @InternalCoroutinesApi + class Resume(time: Long, id: Long, val dispatcher: CoroutineDispatcher, val continuation: CancellableContinuation<Unit>) : Event(time, id) { + @ExperimentalCoroutinesApi override fun run() { with(continuation) { dispatcher.resumeUndispatched(Unit) } } @@ -323,7 +335,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : override fun toString(): String = "Resume[$time]" } - class Timeout(time: Long, val block: Runnable, var cancelled: Boolean = false) : Event(time), DisposableHandle { + class Timeout(time: Long, id: Long, val block: Runnable, var cancelled: Boolean = false) : Event(time, id), DisposableHandle { override fun run() { if (!cancelled) { block.run() @@ -334,10 +346,10 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : cancelled = true } - override fun toString(): String = "Dispatch[$time]" + override fun toString(): String = "Timeout[$time]" } - class Send<T : Any>(time: Long, val channel: ChannelImpl<T>, val message: T) : Event(time) { + class Send<T : Any>(time: Long, id: Long, val channel: ChannelImpl<T>, val message: T) : Event(time, id) { override fun run() { channel.send(message) } |
