diff options
Diffstat (limited to 'opendc-kernel-omega/src/main')
| -rw-r--r-- | opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt | 37 |
1 files changed, 24 insertions, 13 deletions
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt index 11217e8d..4e3aa16c 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -27,11 +27,13 @@ package com.atlarge.opendc.omega import com.atlarge.opendc.simulator.* import com.atlarge.opendc.simulator.instrumentation.Instrument import com.atlarge.opendc.simulator.instrumentation.InstrumentScope +import com.atlarge.opendc.simulator.instrumentation.Port import com.atlarge.opendc.simulator.kernel.Simulation import kotlinx.coroutines.experimental.channels.Channel import kotlinx.coroutines.experimental.channels.ReceiveChannel import kotlinx.coroutines.experimental.channels.SendChannel import mu.KotlinLogging +import java.lang.ref.WeakReference import java.util.* import kotlin.coroutines.experimental.* @@ -132,22 +134,31 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot schedule(prepare(message, destination, sender, delay)) // Simulation implementation - override fun <T> install(capacity: Int, instrument: Instrument<T, M>): ReceiveChannel<T> { - val channel = Channel<T>(capacity) - val process = object : Process<Unit, M> { - override val initialState = Unit - override suspend fun Context<Unit, M>.run() { - val builder = object : InstrumentScope<T, M>, SendChannel<T> by channel, Context<Unit, M> by this {} - try { - instrument(builder) - channel.close() - } catch (cause: Throwable) { - channel.close(cause) + override fun openPort(): Port<M> = object : Port<M> { + val channels: MutableSet<WeakReference<Channel<*>>> = mutableSetOf() + + override fun <T> install(capacity: Int, instrument: Instrument<T, M>): ReceiveChannel<T> { + val channel = Channel<T>(capacity) + val process = object : Process<Unit, M> { + override val initialState = Unit + override suspend fun Context<Unit, M>.run() { + val builder = object : InstrumentScope<T, M>, SendChannel<T> by channel, Context<Unit, M> by this {} + try { + instrument(builder) + channel.close() + } catch (cause: Throwable) { + channel.close(cause) + } } } + channels.add(WeakReference(channel)) + register(process) + return channel } - register(process) - return channel + + override fun close(cause: Throwable?): Boolean = channels + .map { it.get()?.close(cause) ?: false } + .any() } override fun step() { |
