summaryrefslogtreecommitdiff
path: root/opendc-kernel-omega/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-kernel-omega/src/main')
-rw-r--r--opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt23
1 files changed, 23 insertions, 0 deletions
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
index 532a033a..11217e8d 100644
--- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
+++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
@@ -25,7 +25,12 @@
package com.atlarge.opendc.omega
import com.atlarge.opendc.simulator.*
+import com.atlarge.opendc.simulator.instrumentation.Instrument
+import com.atlarge.opendc.simulator.instrumentation.InstrumentScope
import com.atlarge.opendc.simulator.kernel.Simulation
+import kotlinx.coroutines.experimental.channels.Channel
+import kotlinx.coroutines.experimental.channels.ReceiveChannel
+import kotlinx.coroutines.experimental.channels.SendChannel
import mu.KotlinLogging
import java.util.*
import kotlin.coroutines.experimental.*
@@ -127,6 +132,24 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot
schedule(prepare(message, destination, sender, delay))
// Simulation implementation
+ override fun <T> install(capacity: Int, instrument: Instrument<T, M>): ReceiveChannel<T> {
+ val channel = Channel<T>(capacity)
+ val process = object : Process<Unit, M> {
+ override val initialState = Unit
+ override suspend fun Context<Unit, M>.run() {
+ val builder = object : InstrumentScope<T, M>, SendChannel<T> by channel, Context<Unit, M> by this {}
+ try {
+ instrument(builder)
+ channel.close()
+ } catch (cause: Throwable) {
+ channel.close(cause)
+ }
+ }
+ }
+ register(process)
+ return channel
+ }
+
override fun step() {
while (true) {
val envelope = queue.peek() ?: return