summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-01-19 21:59:39 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-01-19 21:59:39 +0100
commit878990c8e230a43b534fc8e870f59630152fb6bf (patch)
tree8fafdab0235e3ce09c8ef38520853ac60b290505
parentefb31902e6950f52ba8b50c2472f867e6d98313b (diff)
feat: Add support for selecting on receive ports
This change adds experimental support for selecting on ports. This allows the user to receive messages from multiple channels at the same time.
-rw-r--r--odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt7
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt32
2 files changed, 37 insertions, 2 deletions
diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt
index 30d4790c..7c730866 100644
--- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt
+++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt
@@ -24,6 +24,8 @@
package com.atlarge.odcsim
+import kotlinx.coroutines.selects.SelectClause1
+
/**
* A communication endpoint of a specific logical process through which messages pass.
*
@@ -48,6 +50,11 @@ public interface ReceivePort<out T : Any> : Port {
* Receive a message send to this port or suspend the caller while no messages have been received at this port yet.
*/
public suspend fun receive(): T
+
+ /**
+ * Clause for select expression for receiving a message from the channel.
+ */
+ val onReceive: SelectClause1<T>
}
/**
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
index b0df32d4..15d3bd88 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
@@ -36,10 +36,12 @@ import com.atlarge.odcsim.SendRef
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Delay
+import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.Runnable
import kotlinx.coroutines.channels.Channel as KChannel
import kotlinx.coroutines.isActive
+import kotlinx.coroutines.selects.SelectClause1
import org.jetbrains.annotations.Async
import java.time.Clock
import java.time.Instant
@@ -91,12 +93,18 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
*/
@InternalCoroutinesApi
private val dispatcher: CoroutineDispatcher = object : CoroutineDispatcher(), Delay {
+ override fun dispatch(context: CoroutineContext, block: Runnable) {
+ schedule(Event.Dispatch(clock.time, block))
+ }
+
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
schedule(Event.Resume(clock.time + timeMillis, this, continuation))
}
- override fun dispatch(context: CoroutineContext, block: Runnable) {
- schedule(Event.Dispatch(clock.time, block))
+ override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
+ val event = Event.Timeout(clock.time + timeMillis, block)
+ schedule(event)
+ return event
}
}
@@ -239,6 +247,9 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
*/
private val channel = KChannel<T>(KChannel.CONFLATED)
+ val onReceive: SelectClause1<T>
+ get() = channel.onReceive
+
/**
* Receive a message from this channel.
*/
@@ -281,6 +292,9 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
return true
}
+ override val onReceive: SelectClause1<T>
+ get() = channel.onReceive
+
override suspend fun receive(): T {
check(!closed) { "Port is closed" }
return channel.receive()
@@ -310,6 +324,20 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
override fun toString(): String = "Resume[$time]"
}
+ class Timeout(time: Long, val block: Runnable, var cancelled: Boolean = false) : Event(time), DisposableHandle {
+ override fun run() {
+ if (!cancelled) {
+ block.run()
+ }
+ }
+
+ override fun dispose() {
+ cancelled = true
+ }
+
+ override fun toString(): String = "Dispatch[$time]"
+ }
+
class Send<T : Any>(time: Long, val channel: ChannelImpl<T>, val message: T) : Event(time) {
override fun run() {
channel.send(message)