diff options
| -rw-r--r-- | odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt | 7 | ||||
| -rw-r--r-- | odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt | 32 |
2 files changed, 37 insertions, 2 deletions
diff --git a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt index 30d4790c..7c730866 100644 --- a/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt +++ b/odcsim/odcsim-api/src/main/kotlin/com/atlarge/odcsim/Port.kt @@ -24,6 +24,8 @@ package com.atlarge.odcsim +import kotlinx.coroutines.selects.SelectClause1 + /** * A communication endpoint of a specific logical process through which messages pass. * @@ -48,6 +50,11 @@ public interface ReceivePort<out T : Any> : Port { * Receive a message send to this port or suspend the caller while no messages have been received at this port yet. */ public suspend fun receive(): T + + /** + * Clause for select expression for receiving a message from the channel. + */ + val onReceive: SelectClause1<T> } /** diff --git a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt index b0df32d4..15d3bd88 100644 --- a/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt +++ b/odcsim/odcsim-engine-omega/src/main/kotlin/com/atlarge/odcsim/engine/omega/OmegaSimulationEngine.kt @@ -36,10 +36,12 @@ import com.atlarge.odcsim.SendRef import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.Delay +import kotlinx.coroutines.DisposableHandle import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.Runnable import kotlinx.coroutines.channels.Channel as KChannel import kotlinx.coroutines.isActive +import kotlinx.coroutines.selects.SelectClause1 import org.jetbrains.annotations.Async import java.time.Clock import java.time.Instant @@ -91,12 +93,18 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : */ @InternalCoroutinesApi private val dispatcher: CoroutineDispatcher = object : CoroutineDispatcher(), Delay { + override fun dispatch(context: CoroutineContext, block: Runnable) { + schedule(Event.Dispatch(clock.time, block)) + } + override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { schedule(Event.Resume(clock.time + timeMillis, this, continuation)) } - override fun dispatch(context: CoroutineContext, block: Runnable) { - schedule(Event.Dispatch(clock.time, block)) + override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + val event = Event.Timeout(clock.time + timeMillis, block) + schedule(event) + return event } } @@ -239,6 +247,9 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : */ private val channel = KChannel<T>(KChannel.CONFLATED) + val onReceive: SelectClause1<T> + get() = channel.onReceive + /** * Receive a message from this channel. */ @@ -281,6 +292,9 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : return true } + override val onReceive: SelectClause1<T> + get() = channel.onReceive + override suspend fun receive(): T { check(!closed) { "Port is closed" } return channel.receive() @@ -310,6 +324,20 @@ class OmegaSimulationEngine(rootBehavior: Behavior, override val name: String) : override fun toString(): String = "Resume[$time]" } + class Timeout(time: Long, val block: Runnable, var cancelled: Boolean = false) : Event(time), DisposableHandle { + override fun run() { + if (!cancelled) { + block.run() + } + } + + override fun dispose() { + cancelled = true + } + + override fun toString(): String = "Dispatch[$time]" + } + class Send<T : Any>(time: Long, val channel: ChannelImpl<T>, val message: T) : Event(time) { override fun run() { channel.send(message) |
