summaryrefslogtreecommitdiff
path: root/odcsim
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2020-01-19 21:59:39 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2020-01-19 21:59:39 +0100
commit878990c8e230a43b534fc8e870f59630152fb6bf (patch)
tree8fafdab0235e3ce09c8ef38520853ac60b290505 /odcsim
parentefb31902e6950f52ba8b50c2472f867e6d98313b (diff)
feat: Add support for selecting on receive ports
This change adds experimental support for selecting on ports. This allows the user to receive messages from multiple channels at the same time.
Diffstat (limited to 'odcsim')
-rw-r--r--odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt7
-rw-r--r--odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt32
2 files changed, 37 insertions, 2 deletions
diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt
index 30d4790c..7c730866 100644
--- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt
+++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt
@@ -24,6 +24,8 @@
package com.atlarge.odcsim
+import kotlinx.coroutines.selects.SelectClause1
+
/**
* A communication endpoint of a specific logical process through which messages pass.
*
@@ -48,6 +50,11 @@ public interface ReceivePort<out T : Any> : Port {
* Receive a message send to this port or suspend the caller while no messages have been received at this port yet.
*/
public suspend fun receive(): T
+
+ /**
+ * Clause for select expression for receiving a message from the channel.
+ */
+ val onReceive: SelectClause1<T>
}
/**
diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
index b0df32d4..15d3bd88 100644
--- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
+++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt
@@ -36,10 +36,12 @@ import com.atlarge.odcsim.SendRef
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Delay
+import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.Runnable
import kotlinx.coroutines.channels.Channel as KChannel
import kotlinx.coroutines.isActive
+import kotlinx.coroutines.selects.SelectClause1
import org.jetbrains.annotations.Async
import java.time.Clock
import java.time.Instant
@@ -91,12 +93,18 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
*/
@InternalCoroutinesApi
private val dispatcher: CoroutineDispatcher = object : CoroutineDispatcher(), Delay {
+ override fun dispatch(context: CoroutineContext, block: Runnable) {
+ schedule(Event.Dispatch(clock.time, block))
+ }
+
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
schedule(Event.Resume(clock.time + timeMillis, this, continuation))
}
- override fun dispatch(context: CoroutineContext, block: Runnable) {
- schedule(Event.Dispatch(clock.time, block))
+ override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
+ val event = Event.Timeout(clock.time + timeMillis, block)
+ schedule(event)
+ return event
}
}
@@ -239,6 +247,9 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
*/
private val channel = KChannel<T>(KChannel.CONFLATED)
+ val onReceive: SelectClause1<T>
+ get() = channel.onReceive
+
/**
* Receive a message from this channel.
*/
@@ -281,6 +292,9 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
return true
}
+ override val onReceive: SelectClause1<T>
+ get() = channel.onReceive
+
override suspend fun receive(): T {
check(!closed) { "Port is closed" }
return channel.receive()
@@ -310,6 +324,20 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) :
override fun toString(): String = "Resume[$time]"
}
+ class Timeout(time: Long, val block: Runnable, var cancelled: Boolean = false) : Event(time), DisposableHandle {
+ override fun run() {
+ if (!cancelled) {
+ block.run()
+ }
+ }
+
+ override fun dispose() {
+ cancelled = true
+ }
+
+ override fun toString(): String = "Dispatch[$time]"
+ }
+
class Send<T : Any>(time: Long, val channel: ChannelImpl<T>, val message: T) : Event(time) {
override fun run() {
channel.send(message)