summaryrefslogtreecommitdiff
path: root/odcsim/odcsim-engine-omega/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-01-19 21:59:39 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-01-19 21:59:39 +0100
commit878990c8e230a43b534fc8e870f59630152fb6bf (patch)
tree8fafdab0235e3ce09c8ef38520853ac60b290505 /odcsim/odcsim-engine-omega/src
parentefb31902e6950f52ba8b50c2472f867e6d98313b (diff)
feat: Add support for selecting on receive ports
This change adds experimental support for selecting on ports. This allows the user to receive messages from multiple channels at the same time.
Diffstat (limited to 'odcsim/odcsim-engine-omega/src')
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt32
1 files changed, 30 insertions, 2 deletions
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
index b0df32d4..15d3bd88 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
@@ -36,10 +36,12 @@ import com.atlarge.odcsim.SendRef
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Delay
+import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.Runnable
import kotlinx.coroutines.channels.Channel as KChannel
import kotlinx.coroutines.isActive
+import kotlinx.coroutines.selects.SelectClause1
import org.jetbrains.annotations.Async
import java.time.Clock
import java.time.Instant
@@ -91,12 +93,18 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
*/
@InternalCoroutinesApi
private val dispatcher: CoroutineDispatcher = object : CoroutineDispatcher(), Delay {
+ override fun dispatch(context: CoroutineContext, block: Runnable) {
+ schedule(Event.Dispatch(clock.time, block))
+ }
+
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
schedule(Event.Resume(clock.time + timeMillis, this, continuation))
}
- override fun dispatch(context: CoroutineContext, block: Runnable) {
- schedule(Event.Dispatch(clock.time, block))
+ override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
+ val event = Event.Timeout(clock.time + timeMillis, block)
+ schedule(event)
+ return event
}
}
@@ -239,6 +247,9 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
*/
private val channel = KChannel<T>(KChannel.CONFLATED)
+ val onReceive: SelectClause1<T>
+ get() = channel.onReceive
+
/**
* Receive a message from this channel.
*/
@@ -281,6 +292,9 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
return true
}
+ override val onReceive: SelectClause1<T>
+ get() = channel.onReceive
+
override suspend fun receive(): T {
check(!closed) { "Port is closed" }
return channel.receive()
@@ -310,6 +324,20 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
override fun toString(): String = "Resume[$time]"
}
+ class Timeout(time: Long, val block: Runnable, var cancelled: Boolean = false) : Event(time), DisposableHandle {
+ override fun run() {
+ if (!cancelled) {
+ block.run()
+ }
+ }
+
+ override fun dispose() {
+ cancelled = true
+ }
+
+ override fun toString(): String = "Dispatch[$time]"
+ }
+
class Send<T : Any>(time: Long, val channel: ChannelImpl<T>, val message: T) : Event(time) {
override fun run() {
channel.send(message)