diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2018-02-14 12:12:57 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2018-02-23 12:20:30 +0100 |
| commit | 699338a4e7b226ae7acf0f4aef1b0b28d90eb5b3 (patch) | |
| tree | fbebd90aebd1863f6bcbe4ccbb790c6c7559c91e | |
| parent | a64ae13a6c5aef435e048b13bb3d7bad449f783b (diff) | |
feat(#12): Implement Instrumentation API in Omega kernel
These changes implement the Instrumentation API described in issue #11
into the Omega simulation kernel.
4 files changed, 56 insertions, 6 deletions
diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt index 68f76569..75f7ed60 100644 --- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt @@ -28,4 +28,4 @@ typealias Instrument<T, M> = suspend InstrumentScope<T, M>.() -> Unit * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -interface InstrumentScope<in T, out M>: SendChannel<T>, Context<Unit, M> +interface InstrumentScope<in T, M>: SendChannel<T>, Context<Unit, M> diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt index 74f1fb36..ce6dc6a4 100644 --- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt @@ -38,7 +38,7 @@ import kotlinx.coroutines.experimental.channels.ReceiveChannel * @param M The shape of the model over which the simulation runs. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -interface Simulation<out M> { +interface Simulation<M> { /** * The model in which the simulation runs. */ diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt index 532a033a..11217e8d 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -25,7 +25,12 @@ package com.atlarge.opendc.omega import com.atlarge.opendc.simulator.* +import com.atlarge.opendc.simulator.instrumentation.Instrument +import com.atlarge.opendc.simulator.instrumentation.InstrumentScope import com.atlarge.opendc.simulator.kernel.Simulation +import kotlinx.coroutines.experimental.channels.Channel +import kotlinx.coroutines.experimental.channels.ReceiveChannel +import kotlinx.coroutines.experimental.channels.SendChannel import mu.KotlinLogging import java.util.* import kotlin.coroutines.experimental.* @@ -127,6 +132,24 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot schedule(prepare(message, destination, sender, delay)) // Simulation implementation + override fun <T> install(capacity: Int, instrument: Instrument<T, M>): ReceiveChannel<T> { + val channel = Channel<T>(capacity) + val process = object : Process<Unit, M> { + override val initialState = Unit + override suspend fun Context<Unit, M>.run() { + val builder = object : InstrumentScope<T, M>, SendChannel<T> by channel, Context<Unit, M> by this {} + try { + instrument(builder) + channel.close() + } catch (cause: Throwable) { + channel.close(cause) + } + } + } + register(process) + return channel + } + override fun step() { while (true) { val envelope = queue.peek() ?: return diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt index c47f9a26..42ca05ec 100644 --- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt +++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt @@ -27,8 +27,13 @@ package com.atlarge.opendc.omega import com.atlarge.opendc.simulator.Bootstrap import com.atlarge.opendc.simulator.Context import com.atlarge.opendc.simulator.Process +import com.atlarge.opendc.simulator.instrumentation.Instrument +import com.atlarge.opendc.simulator.kernel.Kernel +import com.atlarge.opendc.simulator.kernel.Simulation +import kotlinx.coroutines.experimental.Unconfined +import kotlinx.coroutines.experimental.async +import kotlinx.coroutines.experimental.channels.* import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test /** @@ -123,6 +128,7 @@ internal class SmokeTest { hold(10) } } + /** * Test if the kernel allows access to the simulation model object. */ @@ -135,9 +141,30 @@ internal class SmokeTest { value } - val simulation = OmegaKernel.create(bootstrap) - simulation.run(5) + val kernel = OmegaKernel.create(bootstrap) + kernel.run(5) + } + + + @Test + fun `instrumentation works`() { + val instrument: Instrument<Int, Unit> = { + var value = 0 + + for (i in 1..10) { + send(value) + value += 10 + hold(20) + } + } + + val simulation: Simulation<Unit> = OmegaKernel.create(Bootstrap.create { Unit }) + val stream = simulation.install(instrument) + + val res = async(Unconfined) { + stream.consumeEach { println(it) } + } - assertTrue(simulation.run { process.state }) + simulation.run(100) } } |
