summaryrefslogtreecommitdiff
path: root/odcsim
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-02-11 13:45:19 +0100
committerGeorgios Andreadis <info@gandreadis.com>2020-02-11 14:40:15 +0100
commit1e6d752d2186987e27059eececf951f68ce98977 (patch)
tree2b5b468ca4d810686e0cb1c780822e57a7519ce3 /odcsim
parent65a91a92afd8b6e71f08f5cbe345af30606c4861 (diff)
bug: Guarantee FIFO order of messages in the queue
This change fixes the issue where messages are not delivered in FIFO order due to the internal priority not guaranteeing insertion order. For now, we fix this issue by adding a unique increasing identifier to each event in the queue.
Diffstat (limited to 'odcsim')
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt42
1 files changed, 27 insertions, 15 deletions
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
index 34e5fd9a..767e139a 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
@@ -46,8 +46,10 @@ import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Delay
import kotlinx.coroutines.DisposableHandle
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.Runnable
+import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.Channel as KChannel
import kotlinx.coroutines.isActive
import kotlinx.coroutines.selects.SelectClause1
@@ -88,21 +90,23 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
*/
private val channels: MutableSet<ChannelImpl<*>> = HashSet()
+ private var nextId: Long = 0
+
/**
* The [CoroutineDispatcher] instance for dispatching the coroutines representing the logical behavior.
*/
@InternalCoroutinesApi
private val dispatcher: CoroutineDispatcher = object : CoroutineDispatcher(), Delay {
override fun dispatch(context: CoroutineContext, block: Runnable) {
- schedule(Event.Dispatch(clock.time, block))
+ schedule(Event.Dispatch(clock.time, nextId++, block))
}
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
- schedule(Event.Resume(clock.time + timeMillis, this, continuation))
+ schedule(Event.Resume(clock.time + timeMillis, nextId++, this, continuation))
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
- val event = Event.Timeout(clock.time + timeMillis, block)
+ val event = Event.Timeout(clock.time + timeMillis, nextId++, block)
schedule(event)
return event
}
@@ -174,12 +178,14 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
}
private inner class ProcessImpl(override val self: ProcessRef, private val behavior: Behavior) : ProcessContext, Continuation<Unit> {
+ val job = SupervisorJob()
+
override val clock: Clock
get() = this@OmegaSimulationEngine.clock
override fun spawn(behavior: Behavior): ProcessRef {
val name = "$" + UUID.randomUUID()
- return spawn(behavior, name)
+ return this@OmegaSimulationEngine.spawn(behavior, name)
}
override fun spawn(behavior: Behavior, name: String): ProcessRef {
@@ -209,13 +215,16 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
// Stop the logical process
if (result.isFailure) {
result.exceptionOrNull()!!.printStackTrace()
+ job.completeExceptionally(result.exceptionOrNull()!!)
+ } else {
+ job.complete()
}
}
override val key: CoroutineContext.Key<*> = ProcessContext.Key
@InternalCoroutinesApi
- override val context: CoroutineContext = this + dispatcher
+ override val context: CoroutineContext = this + dispatcher + job
}
/**
@@ -245,7 +254,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
/**
* The underlying `kotlinx.coroutines` channel to back this channel implementation.
*/
- private val channel = KChannel<T>(KChannel.CONFLATED)
+ private val channel = KChannel<T>(KChannel.UNLIMITED)
val onReceive: SelectClause1<T>
get() = channel.onReceive
@@ -275,7 +284,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
override fun send(message: T) {
check(!closed) { "Port is closed" }
- schedule(Event.Send(clock.time, channelImpl, message))
+ schedule(Event.Send(clock.time, nextId++, channelImpl, message))
}
}
@@ -305,17 +314,20 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
*
* @property time The point in time to deliver the message.
*/
- private sealed class Event(val time: Long) : Comparable<Event>, Runnable {
- override fun compareTo(other: Event): Int = time.compareTo(other.time)
+ private sealed class Event(val time: Long, val id: Long) : Comparable<Event>, Runnable {
+ override fun compareTo(other: Event): Int {
+ val cmp = time.compareTo(other.time)
+ return if (cmp == 0) id.compareTo(other.id) else cmp
+ }
- class Dispatch(time: Long, val block: Runnable) : Event(time) {
+ class Dispatch(time: Long, id: Long, val block: Runnable) : Event(time, id) {
override fun run() = block.run()
override fun toString(): String = "Dispatch[$time]"
}
- class Resume(time: Long, val dispatcher: CoroutineDispatcher, val continuation: CancellableContinuation<Unit>) : Event(time) {
- @InternalCoroutinesApi
+ class Resume(time: Long, id: Long, val dispatcher: CoroutineDispatcher, val continuation: CancellableContinuation<Unit>) : Event(time, id) {
+ @ExperimentalCoroutinesApi
override fun run() {
with(continuation) { dispatcher.resumeUndispatched(Unit) }
}
@@ -323,7 +335,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
override fun toString(): String = "Resume[$time]"
}
- class Timeout(time: Long, val block: Runnable, var cancelled: Boolean = false) : Event(time), DisposableHandle {
+ class Timeout(time: Long, id: Long, val block: Runnable, var cancelled: Boolean = false) : Event(time, id), DisposableHandle {
override fun run() {
if (!cancelled) {
block.run()
@@ -334,10 +346,10 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
cancelled = true
}
- override fun toString(): String = "Dispatch[$time]"
+ override fun toString(): String = "Timeout[$time]"
}
- class Send<T : Any>(time: Long, val channel: ChannelImpl<T>, val message: T) : Event(time) {
+ class Send<T : Any>(time: Long, id: Long, val channel: ChannelImpl<T>, val message: T) : Event(time, id) {
override fun run() {
channel.send(message)
}