diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-01-19 21:59:39 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2020-01-19 21:59:39 +0100 |
| commit | 878990c8e230a43b534fc8e870f59630152fb6bf (patch) | |
| tree | 8fafdab0235e3ce09c8ef38520853ac60b290505 /odcsim | |
| parent | efb31902e6950f52ba8b50c2472f867e6d98313b (diff) | |
feat: Add support for selecting on receive ports
This change adds experimental support for selecting on ports. This
allows the user to receive messages from multiple channels at the same
time.
Diffstat (limited to 'odcsim')
| -rw-r--r-- | odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt | 7 | ||||
| -rw-r--r-- | odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt | 32 |
2 files changed, 37 insertions, 2 deletions
diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt index 30d4790c..7c730866 100644 --- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt +++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt @@ -24,6 +24,8 @@ package com.atlarge.odcsim +import kotlinx.coroutines.selects.SelectClause1 + /** * A communication endpoint of a specific logical process through which messages pass. * @@ -48,6 +50,11 @@ public interface ReceivePort<out T : Any> : Port { * Receive a message send to this port or suspend the caller while no messages have been received at this port yet. */ public suspend fun receive(): T + + /** + * Clause for select expression for receiving a message from the channel. + */ + val onReceive: SelectClause1<T> } /** diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt index b0df32d4..15d3bd88 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt @@ -36,10 +36,12 @@ import com.atlarge.odcsim.SendRef import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.Delay +import kotlinx.coroutines.DisposableHandle import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.Runnable import kotlinx.coroutines.channels.Channel as KChannel import kotlinx.coroutines.isActive +import kotlinx.coroutines.selects.SelectClause1 import org.jetbrains.annotations.Async import java.time.Clock import java.time.Instant @@ -91,12 +93,18 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : */ @InternalCoroutinesApi private val dispatcher: CoroutineDispatcher = object : CoroutineDispatcher(), Delay { + override fun dispatch(context: CoroutineContext, block: Runnable) { + schedule(Event.Dispatch(clock.time, block)) + } + override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { schedule(Event.Resume(clock.time + timeMillis, this, continuation)) } - override fun dispatch(context: CoroutineContext, block: Runnable) { - schedule(Event.Dispatch(clock.time, block)) + override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + val event = Event.Timeout(clock.time + timeMillis, block) + schedule(event) + return event } } @@ -239,6 +247,9 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : */ private val channel = KChannel<T>(KChannel.CONFLATED) + val onReceive: SelectClause1<T> + get() = channel.onReceive + /** * Receive a message from this channel. */ @@ -281,6 +292,9 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : return true } + override val onReceive: SelectClause1<T> + get() = channel.onReceive + override suspend fun receive(): T { check(!closed) { "Port is closed" } return channel.receive() @@ -310,6 +324,20 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : override fun toString(): String = "Resume[$time]" } + class Timeout(time: Long, val block: Runnable, var cancelled: Boolean = false) : Event(time), DisposableHandle { + override fun run() { + if (!cancelled) { + block.run() + } + } + + override fun dispose() { + cancelled = true + } + + override fun toString(): String = "Dispatch[$time]" + } + class Send<T : Any>(time: Long, val channel: ChannelImpl<T>, val message: T) : Event(time) { override fun run() { channel.send(message) |
