summaryrefslogtreecommitdiff
path: root/odcsim
diff options
context:
space:
mode:
Diffstat (limited to 'odcsim')
-rw-r--r--odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt52
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt47
2 files changed, 84 insertions, 15 deletions
diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt
new file mode 100644
index 00000000..b15ce3ae
--- /dev/null
+++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Channels.kt
@@ -0,0 +1,52 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.odcsim
+
+suspend fun <T : Any, U : Any> SendRef<T>.ask(block: (SendRef<U>) -> T): U {
+ val ctx = processContext
+ val outlet = ctx.connect(this)
+ val channel = ctx.open<U>()
+ try {
+ outlet.send(block(channel.send))
+ } finally {
+ outlet.close()
+ }
+
+ val inlet = ctx.listen(channel.receive)
+ try {
+ return inlet.receive()
+ } finally {
+ inlet.close()
+ }
+}
+
+suspend fun <T : Any> SendRef<T>.sendOnce(msg: T) {
+ val outlet = processContext.connect(this)
+ try {
+ outlet.send(msg)
+ } finally {
+ outlet.close()
+ }
+}
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
index 34e5fd9a..11dae528 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
@@ -46,8 +46,10 @@ import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Delay
import kotlinx.coroutines.DisposableHandle
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.Runnable
+import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.Channel as KChannel
import kotlinx.coroutines.isActive
import kotlinx.coroutines.selects.SelectClause1
@@ -89,20 +91,27 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
private val channels: MutableSet<ChannelImpl<*>> = HashSet()
/**
+ * A unique increasing identifier assigned to each event, needed because otherwise two events occurring in sequence
+ * (but at the same time) may be differently ordered in the internal priority queue (queue) since it does not
+ * guarantee insertion order.
+ */
+ private var nextId: Long = 0
+
+ /**
* The [CoroutineDispatcher] instance for dispatching the coroutines representing the logical behavior.
*/
@InternalCoroutinesApi
private val dispatcher: CoroutineDispatcher = object : CoroutineDispatcher(), Delay {
override fun dispatch(context: CoroutineContext, block: Runnable) {
- schedule(Event.Dispatch(clock.time, block))
+ schedule(Event.Dispatch(clock.time, nextId++, block))
}
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
- schedule(Event.Resume(clock.time + timeMillis, this, continuation))
+ schedule(Event.Resume(clock.time + timeMillis, nextId++, this, continuation))
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
- val event = Event.Timeout(clock.time + timeMillis, block)
+ val event = Event.Timeout(clock.time + timeMillis, nextId++, block)
schedule(event)
return event
}
@@ -174,12 +183,14 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
}
private inner class ProcessImpl(override val self: ProcessRef, private val behavior: Behavior) : ProcessContext, Continuation<Unit> {
+ val job = SupervisorJob()
+
override val clock: Clock
get() = this@OmegaSimulationEngine.clock
override fun spawn(behavior: Behavior): ProcessRef {
val name = "$" + UUID.randomUUID()
- return spawn(behavior, name)
+ return this@OmegaSimulationEngine.spawn(behavior, name)
}
override fun spawn(behavior: Behavior, name: String): ProcessRef {
@@ -209,13 +220,16 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
// Stop the logical process
if (result.isFailure) {
result.exceptionOrNull()!!.printStackTrace()
+ job.completeExceptionally(result.exceptionOrNull()!!)
+ } else {
+ job.complete()
}
}
override val key: CoroutineContext.Key<*> = ProcessContext.Key
@InternalCoroutinesApi
- override val context: CoroutineContext = this + dispatcher
+ override val context: CoroutineContext = this + dispatcher + job
}
/**
@@ -245,7 +259,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
/**
* The underlying `kotlinx.coroutines` channel to back this channel implementation.
*/
- private val channel = KChannel<T>(KChannel.CONFLATED)
+ private val channel = KChannel<T>(KChannel.UNLIMITED)
val onReceive: SelectClause1<T>
get() = channel.onReceive
@@ -275,7 +289,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
override fun send(message: T) {
check(!closed) { "Port is closed" }
- schedule(Event.Send(clock.time, channelImpl, message))
+ schedule(Event.Send(clock.time, nextId++, channelImpl, message))
}
}
@@ -305,17 +319,20 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
*
* @property time The point in time to deliver the message.
*/
- private sealed class Event(val time: Long) : Comparable<Event>, Runnable {
- override fun compareTo(other: Event): Int = time.compareTo(other.time)
+ private sealed class Event(val time: Long, val id: Long) : Comparable<Event>, Runnable {
+ override fun compareTo(other: Event): Int {
+ val cmp = time.compareTo(other.time)
+ return if (cmp == 0) id.compareTo(other.id) else cmp
+ }
- class Dispatch(time: Long, val block: Runnable) : Event(time) {
+ class Dispatch(time: Long, id: Long, val block: Runnable) : Event(time, id) {
override fun run() = block.run()
override fun toString(): String = "Dispatch[$time]"
}
- class Resume(time: Long, val dispatcher: CoroutineDispatcher, val continuation: CancellableContinuation<Unit>) : Event(time) {
- @InternalCoroutinesApi
+ class Resume(time: Long, id: Long, val dispatcher: CoroutineDispatcher, val continuation: CancellableContinuation<Unit>) : Event(time, id) {
+ @ExperimentalCoroutinesApi
override fun run() {
with(continuation) { dispatcher.resumeUndispatched(Unit) }
}
@@ -323,7 +340,7 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
override fun toString(): String = "Resume[$time]"
}
- class Timeout(time: Long, val block: Runnable, var cancelled: Boolean = false) : Event(time), DisposableHandle {
+ class Timeout(time: Long, id: Long, val block: Runnable, var cancelled: Boolean = false) : Event(time, id), DisposableHandle {
override fun run() {
if (!cancelled) {
block.run()
@@ -334,10 +351,10 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
cancelled = true
}
- override fun toString(): String = "Dispatch[$time]"
+ override fun toString(): String = "Timeout[$time]"
}
- class Send<T : Any>(time: Long, val channel: ChannelImpl<T>, val message: T) : Event(time) {
+ class Send<T : Any>(time: Long, id: Long, val channel: ChannelImpl<T>, val message: T) : Event(time, id) {
override fun run() {
channel.send(message)
}