From 699338a4e7b226ae7acf0f4aef1b0b28d90eb5b3 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 14 Feb 2018 12:12:57 +0100 Subject: feat(#12): Implement Instrumentation API in Omega kernel These changes implement the Instrumentation API described in issue #11 into the Omega simulation kernel. --- .../com/atlarge/opendc/omega/OmegaSimulation.kt | 23 ++++++++++++++ .../kotlin/com/atlarge/opendc/omega/SmokeTest.kt | 35 +++++++++++++++++++--- 2 files changed, 54 insertions(+), 4 deletions(-) (limited to 'opendc-kernel-omega') diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt index 532a033a..11217e8d 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -25,7 +25,12 @@ package com.atlarge.opendc.omega import com.atlarge.opendc.simulator.* +import com.atlarge.opendc.simulator.instrumentation.Instrument +import com.atlarge.opendc.simulator.instrumentation.InstrumentScope import com.atlarge.opendc.simulator.kernel.Simulation +import kotlinx.coroutines.experimental.channels.Channel +import kotlinx.coroutines.experimental.channels.ReceiveChannel +import kotlinx.coroutines.experimental.channels.SendChannel import mu.KotlinLogging import java.util.* import kotlin.coroutines.experimental.* @@ -127,6 +132,24 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot schedule(prepare(message, destination, sender, delay)) // Simulation implementation + override fun install(capacity: Int, instrument: Instrument): ReceiveChannel { + val channel = Channel(capacity) + val process = object : Process { + override val initialState = Unit + override suspend fun Context.run() { + val builder = object : InstrumentScope, SendChannel by channel, Context by this {} + try { + instrument(builder) + channel.close() + } catch (cause: Throwable) { + channel.close(cause) + } + } + } + register(process) + return channel + } + override fun step() { while (true) { val envelope = queue.peek() ?: return diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt index c47f9a26..42ca05ec 100644 --- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt +++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt @@ -27,8 +27,13 @@ package com.atlarge.opendc.omega import com.atlarge.opendc.simulator.Bootstrap import com.atlarge.opendc.simulator.Context import com.atlarge.opendc.simulator.Process +import com.atlarge.opendc.simulator.instrumentation.Instrument +import com.atlarge.opendc.simulator.kernel.Kernel +import com.atlarge.opendc.simulator.kernel.Simulation +import kotlinx.coroutines.experimental.Unconfined +import kotlinx.coroutines.experimental.async +import kotlinx.coroutines.experimental.channels.* import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test /** @@ -123,6 +128,7 @@ internal class SmokeTest { hold(10) } } + /** * Test if the kernel allows access to the simulation model object. */ @@ -135,9 +141,30 @@ internal class SmokeTest { value } - val simulation = OmegaKernel.create(bootstrap) - simulation.run(5) + val kernel = OmegaKernel.create(bootstrap) + kernel.run(5) + } + + + @Test + fun `instrumentation works`() { + val instrument: Instrument = { + var value = 0 + + for (i in 1..10) { + send(value) + value += 10 + hold(20) + } + } + + val simulation: Simulation = OmegaKernel.create(Bootstrap.create { Unit }) + val stream = simulation.install(instrument) + + val res = async(Unconfined) { + stream.consumeEach { println(it) } + } - assertTrue(simulation.run { process.state }) + simulation.run(100) } } -- cgit v1.2.3 From 2c980f4627309b55a160dbe7fc17156201d9dde4 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 27 Feb 2018 10:17:46 +0100 Subject: feat(#12): Allow closing of instrument streams This change in Instrumentation API allows the user to close the data stream of an instrument by introducing a new concept: Port. A user can open a `Port` for a `Simulation` object and attach an arbitrary amount of instruments to this port. The data streams are closed by calling `Port#close()`. --- .../com/atlarge/opendc/omega/OmegaSimulation.kt | 37 ++++++++++++++-------- .../kotlin/com/atlarge/opendc/omega/SmokeTest.kt | 6 ++-- 2 files changed, 26 insertions(+), 17 deletions(-) (limited to 'opendc-kernel-omega') diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt index 11217e8d..4e3aa16c 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -27,11 +27,13 @@ package com.atlarge.opendc.omega import com.atlarge.opendc.simulator.* import com.atlarge.opendc.simulator.instrumentation.Instrument import com.atlarge.opendc.simulator.instrumentation.InstrumentScope +import com.atlarge.opendc.simulator.instrumentation.Port import com.atlarge.opendc.simulator.kernel.Simulation import kotlinx.coroutines.experimental.channels.Channel import kotlinx.coroutines.experimental.channels.ReceiveChannel import kotlinx.coroutines.experimental.channels.SendChannel import mu.KotlinLogging +import java.lang.ref.WeakReference import java.util.* import kotlin.coroutines.experimental.* @@ -132,22 +134,31 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot schedule(prepare(message, destination, sender, delay)) // Simulation implementation - override fun install(capacity: Int, instrument: Instrument): ReceiveChannel { - val channel = Channel(capacity) - val process = object : Process { - override val initialState = Unit - override suspend fun Context.run() { - val builder = object : InstrumentScope, SendChannel by channel, Context by this {} - try { - instrument(builder) - channel.close() - } catch (cause: Throwable) { - channel.close(cause) + override fun openPort(): Port = object : Port { + val channels: MutableSet>> = mutableSetOf() + + override fun install(capacity: Int, instrument: Instrument): ReceiveChannel { + val channel = Channel(capacity) + val process = object : Process { + override val initialState = Unit + override suspend fun Context.run() { + val builder = object : InstrumentScope, SendChannel by channel, Context by this {} + try { + instrument(builder) + channel.close() + } catch (cause: Throwable) { + channel.close(cause) + } } } + channels.add(WeakReference(channel)) + register(process) + return channel } - register(process) - return channel + + override fun close(cause: Throwable?): Boolean = channels + .map { it.get()?.close(cause) ?: false } + .any() } override fun step() { diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt index 42ca05ec..74d6e2de 100644 --- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt +++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt @@ -28,11 +28,10 @@ import com.atlarge.opendc.simulator.Bootstrap import com.atlarge.opendc.simulator.Context import com.atlarge.opendc.simulator.Process import com.atlarge.opendc.simulator.instrumentation.Instrument -import com.atlarge.opendc.simulator.kernel.Kernel import com.atlarge.opendc.simulator.kernel.Simulation import kotlinx.coroutines.experimental.Unconfined import kotlinx.coroutines.experimental.async -import kotlinx.coroutines.experimental.channels.* +import kotlinx.coroutines.experimental.channels.consumeEach import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test @@ -159,12 +158,11 @@ internal class SmokeTest { } val simulation: Simulation = OmegaKernel.create(Bootstrap.create { Unit }) - val stream = simulation.install(instrument) + val stream = simulation.openPort().install(instrument) val res = async(Unconfined) { stream.consumeEach { println(it) } } - simulation.run(100) } } -- cgit v1.2.3