summaryrefslogtreecommitdiff
path: root/odcsim/odcsim-engine-omega/src
diff options
context:
space:
mode:
authorGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-02-11 14:46:19 +0100
committerGeorgios Andreadis <g.andreadis@student.tudelft.nl>2020-02-11 14:46:19 +0100
commitcd293b79ef2066ffcb605b9c625d6ab0a9af1d16 (patch)
treef5ea605d60538480705a0561e5152f1ed74b2188 /odcsim/odcsim-engine-omega/src
parent65a91a92afd8b6e71f08f5cbe345af30606c4861 (diff)
parent8e16b076e9c7c8c086446853e48dfff80cb45ca1 (diff)
Merge branch 'feat/2.x-model' into 'feat/2.x'
Reimplement OpenDC model using 2.x API See merge request opendc/opendc-simulator!21
Diffstat (limited to 'odcsim/odcsim-engine-omega/src')
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt47
1 files changed, 32 insertions, 15 deletions
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
index 34e5fd9a..11dae528 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
@@ -46,8 +46,10 @@ import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Delay
import kotlinx.coroutines.DisposableHandle
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.Runnable
+import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.Channel as KChannel
import kotlinx.coroutines.isActive
import kotlinx.coroutines.selects.SelectClause1
@@ -89,20 +91,27 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
private val channels: MutableSet<ChannelImpl<*>> = HashSet()
/**
+ * A unique increasing identifier assigned to each event, needed because otherwise two events occurring in sequence
+ * (but at the same time) may be differently ordered in the internal priority queue (queue) since it does not
+ * guarantee insertion order.
+ */
+ private var nextId: Long = 0
+
+ /**
* The [CoroutineDispatcher] instance for dispatching the coroutines representing the logical behavior.
*/
@InternalCoroutinesApi
private val dispatcher: CoroutineDispatcher = object : CoroutineDispatcher(), Delay {
override fun dispatch(context: CoroutineContext, block: Runnable) {
- schedule(Event.Dispatch(clock.time, block))
+ schedule(Event.Dispatch(clock.time, nextId++, block))
}
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
- schedule(Event.Resume(clock.time + timeMillis, this, continuation))
+ schedule(Event.Resume(clock.time + timeMillis, nextId++, this, continuation))
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
- val event = Event.Timeout(clock.time + timeMillis, block)
+ val event = Event.Timeout(clock.time + timeMillis, nextId++, block)
schedule(event)
return event
}
@@ -174,12 +183,14 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
}
private inner class ProcessImpl(override val self: ProcessRef, private val behavior: Behavior) : ProcessContext, Continuation<Unit> {
+ val job = SupervisorJob()
+
override val clock: Clock
get() = this@OmegaSimulationEngine.clock
override fun spawn(behavior: Behavior): ProcessRef {
val name = "$" + UUID.randomUUID()
- return spawn(behavior, name)
+ return this@OmegaSimulationEngine.spawn(behavior, name)
}
override fun spawn(behavior: Behavior, name: String): ProcessRef {
@@ -209,13 +220,16 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
// Stop the logical process
if (result.isFailure) {
result.exceptionOrNull()!!.printStackTrace()
+ job.completeExceptionally(result.exceptionOrNull()!!)
+ } else {
+ job.complete()
}
}
override val key: CoroutineContext.Key<*> = ProcessContext.Key
@InternalCoroutinesApi
- override val context: CoroutineContext = this + dispatcher
+ override val context: CoroutineContext = this + dispatcher + job
}
/**
@@ -245,7 +259,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
/**
* The underlying `kotlinx.coroutines` channel to back this channel implementation.
*/
- private val channel = KChannel<T>(KChannel.CONFLATED)
+ private val channel = KChannel<T>(KChannel.UNLIMITED)
val onReceive: SelectClause1<T>
get() = channel.onReceive
@@ -275,7 +289,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
override fun send(message: T) {
check(!closed) { "Port is closed" }
- schedule(Event.Send(clock.time, channelImpl, message))
+ schedule(Event.Send(clock.time, nextId++, channelImpl, message))
}
}
@@ -305,17 +319,20 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
*
* @property time The point in time to deliver the message.
*/
- private sealed class Event(val time: Long) : Comparable<Event>, Runnable {
- override fun compareTo(other: Event): Int = time.compareTo(other.time)
+ private sealed class Event(val time: Long, val id: Long) : Comparable<Event>, Runnable {
+ override fun compareTo(other: Event): Int {
+ val cmp = time.compareTo(other.time)
+ return if (cmp == 0) id.compareTo(other.id) else cmp
+ }
- class Dispatch(time: Long, val block: Runnable) : Event(time) {
+ class Dispatch(time: Long, id: Long, val block: Runnable) : Event(time, id) {
override fun run() = block.run()
override fun toString(): String = "Dispatch[$time]"
}
- class Resume(time: Long, val dispatcher: CoroutineDispatcher, val continuation: CancellableContinuation<Unit>) : Event(time) {
- @InternalCoroutinesApi
+ class Resume(time: Long, id: Long, val dispatcher: CoroutineDispatcher, val continuation: CancellableContinuation<Unit>) : Event(time, id) {
+ @ExperimentalCoroutinesApi
override fun run() {
with(continuation) { dispatcher.resumeUndispatched(Unit) }
}
@@ -323,7 +340,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
override fun toString(): String = "Resume[$time]"
}
- class Timeout(time: Long, val block: Runnable, var cancelled: Boolean = false) : Event(time), DisposableHandle {
+ class Timeout(time: Long, id: Long, val block: Runnable, var cancelled: Boolean = false) : Event(time, id), DisposableHandle {
override fun run() {
if (!cancelled) {
block.run()
@@ -334,10 +351,10 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
cancelled = true
}
- override fun toString(): String = "Dispatch[$time]"
+ override fun toString(): String = "Timeout[$time]"
}
- class Send<T : Any>(time: Long, val channel: ChannelImpl<T>, val message: T) : Event(time) {
+ class Send<T : Any>(time: Long, id: Long, val channel: ChannelImpl<T>, val message: T) : Event(time, id) {
override fun run() {
channel.send(message)
}