diff options
| author | Fabian Mastenbroek <fabianishere@outlook.com> | 2018-04-22 22:27:13 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2018-04-22 22:27:13 +0200 |
| commit | 07f245dcf4b01ade251d0f4bedc897d7145b04d1 (patch) | |
| tree | a7b4c49df918e812998074f3ff71b6ba1868d645 /opendc-kernel-omega/src/main | |
| parent | f691a72b12a43fa15c1617966450c55206664797 (diff) | |
| parent | 4ccf632ad4418114df0cd8460c7dd3a86c246f9d (diff) | |
feat(#12): Implement Instrumentation API
These changes contain the specification of the new Instrumentation API for the simulator, in addition to the implementation for the Omega kernel. As an example, the API allows users to measure data from processes in simulation and interpolate data points between the measurements.
Closes #11, #12
Diffstat (limited to 'opendc-kernel-omega/src/main')
| -rw-r--r-- | opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt | 34 |
1 files changed, 34 insertions, 0 deletions
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt index 532a033a..4e3aa16c 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -25,8 +25,15 @@ package com.atlarge.opendc.omega import com.atlarge.opendc.simulator.* +import com.atlarge.opendc.simulator.instrumentation.Instrument +import com.atlarge.opendc.simulator.instrumentation.InstrumentScope +import com.atlarge.opendc.simulator.instrumentation.Port import com.atlarge.opendc.simulator.kernel.Simulation +import kotlinx.coroutines.experimental.channels.Channel +import kotlinx.coroutines.experimental.channels.ReceiveChannel +import kotlinx.coroutines.experimental.channels.SendChannel import mu.KotlinLogging +import java.lang.ref.WeakReference import java.util.* import kotlin.coroutines.experimental.* @@ -127,6 +134,33 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot schedule(prepare(message, destination, sender, delay)) // Simulation implementation + override fun openPort(): Port<M> = object : Port<M> { + val channels: MutableSet<WeakReference<Channel<*>>> = mutableSetOf() + + override fun <T> install(capacity: Int, instrument: Instrument<T, M>): ReceiveChannel<T> { + val channel = Channel<T>(capacity) + val process = object : Process<Unit, M> { + override val initialState = Unit + override suspend fun Context<Unit, M>.run() { + val builder = object : InstrumentScope<T, M>, SendChannel<T> by channel, Context<Unit, M> by this {} + try { + instrument(builder) + channel.close() + } catch (cause: Throwable) { + channel.close(cause) + } + } + } + channels.add(WeakReference(channel)) + register(process) + return channel + } + + override fun close(cause: Throwable?): Boolean = channels + .map { it.get()?.close(cause) ?: false } + .any() + } + override fun step() { while (true) { val envelope = queue.peek() ?: return |
