summaryrefslogtreecommitdiff
path: root/opendc-kernel-omega
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2018-02-27 10:17:46 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2018-04-22 22:08:11 +0200
commit2c980f4627309b55a160dbe7fc17156201d9dde4 (patch)
treee7fef0c75acc1753ac1ac55736cff926b3e16182 /opendc-kernel-omega
parent8bc180adfcbde4cd977474174b846b2fa9dfec5b (diff)
feat(#12): Allow closing of instrument streams
This change in Instrumentation API allows the user to close the data stream of an instrument by introducing a new concept: Port. A user can open a `Port` for a `Simulation` object and attach an arbitrary amount of instruments to this port. The data streams are closed by calling `Port#close()`.
Diffstat (limited to 'opendc-kernel-omega')
-rw-r--r--opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt37
-rw-r--r--opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt6
2 files changed, 26 insertions, 17 deletions
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
index 11217e8d..4e3aa16c 100644
--- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
+++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
@@ -27,11 +27,13 @@ package com.atlarge.opendc.omega
import com.atlarge.opendc.simulator.*
import com.atlarge.opendc.simulator.instrumentation.Instrument
import com.atlarge.opendc.simulator.instrumentation.InstrumentScope
+import com.atlarge.opendc.simulator.instrumentation.Port
import com.atlarge.opendc.simulator.kernel.Simulation
import kotlinx.coroutines.experimental.channels.Channel
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.SendChannel
import mu.KotlinLogging
+import java.lang.ref.WeakReference
import java.util.*
import kotlin.coroutines.experimental.*
@@ -132,22 +134,31 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot
schedule(prepare(message, destination, sender, delay))
// Simulation implementation
- override fun <T> install(capacity: Int, instrument: Instrument<T, M>): ReceiveChannel<T> {
- val channel = Channel<T>(capacity)
- val process = object : Process<Unit, M> {
- override val initialState = Unit
- override suspend fun Context<Unit, M>.run() {
- val builder = object : InstrumentScope<T, M>, SendChannel<T> by channel, Context<Unit, M> by this {}
- try {
- instrument(builder)
- channel.close()
- } catch (cause: Throwable) {
- channel.close(cause)
+ override fun openPort(): Port<M> = object : Port<M> {
+ val channels: MutableSet<WeakReference<Channel<*>>> = mutableSetOf()
+
+ override fun <T> install(capacity: Int, instrument: Instrument<T, M>): ReceiveChannel<T> {
+ val channel = Channel<T>(capacity)
+ val process = object : Process<Unit, M> {
+ override val initialState = Unit
+ override suspend fun Context<Unit, M>.run() {
+ val builder = object : InstrumentScope<T, M>, SendChannel<T> by channel, Context<Unit, M> by this {}
+ try {
+ instrument(builder)
+ channel.close()
+ } catch (cause: Throwable) {
+ channel.close(cause)
+ }
}
}
+ channels.add(WeakReference(channel))
+ register(process)
+ return channel
}
- register(process)
- return channel
+
+ override fun close(cause: Throwable?): Boolean = channels
+ .map { it.get()?.close(cause) ?: false }
+ .any()
}
override fun step() {
diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
index 42ca05ec..74d6e2de 100644
--- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
+++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
@@ -28,11 +28,10 @@ import com.atlarge.opendc.simulator.Bootstrap
import com.atlarge.opendc.simulator.Context
import com.atlarge.opendc.simulator.Process
import com.atlarge.opendc.simulator.instrumentation.Instrument
-import com.atlarge.opendc.simulator.kernel.Kernel
import com.atlarge.opendc.simulator.kernel.Simulation
import kotlinx.coroutines.experimental.Unconfined
import kotlinx.coroutines.experimental.async
-import kotlinx.coroutines.experimental.channels.*
+import kotlinx.coroutines.experimental.channels.consumeEach
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
@@ -159,12 +158,11 @@ internal class SmokeTest {
}
val simulation: Simulation<Unit> = OmegaKernel.create(Bootstrap.create { Unit })
- val stream = simulation.install(instrument)
+ val stream = simulation.openPort().install(instrument)
val res = async(Unconfined) {
stream.consumeEach { println(it) }
}
-
simulation.run(100)
}
}