From a64ae13a6c5aef435e048b13bb3d7bad449f783b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 14 Feb 2018 12:09:52 +0100 Subject: feat(#11): Create Instrumentation API This commit creates the interfaces for the new Instrumentation API described in issue #11. This interface allows users to plug an instrumentation device into a (live) simulation in order extract measurements from the simulation. --- opendc-core/build.gradle | 1 + .../opendc/simulator/instrumentation/Instrument.kt | 31 ++++++++++++++++++++++ .../atlarge/opendc/simulator/kernel/Simulation.kt | 27 +++++++++++++++++++ 3 files changed, 59 insertions(+) create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt (limited to 'opendc-core') diff --git a/opendc-core/build.gradle b/opendc-core/build.gradle index 223e73f9..210b8820 100644 --- a/opendc-core/build.gradle +++ b/opendc-core/build.gradle @@ -77,6 +77,7 @@ repositories { dependencies { compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version" + compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:0.22.2" testCompile "org.junit.jupiter:junit-jupiter-api:5.0.0-RC3" testRuntime "org.junit.jupiter:junit-jupiter-engine:5.0.0-RC3" diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt new file mode 100644 index 00000000..68f76569 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt @@ -0,0 +1,31 @@ +package com.atlarge.opendc.simulator.instrumentation + +import com.atlarge.opendc.simulator.kernel.Kernel +import com.atlarge.opendc.simulator.Entity +import com.atlarge.opendc.simulator.Context +import kotlinx.coroutines.experimental.channels.ReceiveChannel +import kotlinx.coroutines.experimental.channels.SendChannel + +/** + * A kernel instrumentation device that allows the observation and measurement of properties of interest within some + * model. + * + * An instrument is a [Process] that emits measurements from within some model in the form of a typed stream. An + * instrument is attached to a simulation using the [Kernel.install] method, which returns a [ReceiveChannel] from which + * the measurements can be extracted out of the simulation. + * + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +typealias Instrument = suspend InstrumentScope.() -> Unit + +/** + * This interface defines the scope in which an instrumentation device is built. + * + * An instrument is a [Process] without any observable state that is allowed to send messages to other [Entity] + * instances in the simulation. In addition, the instrument can emit measurements using the methods provided by the + * [SendChannel] interface. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +interface InstrumentScope: SendChannel, Context diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt index bb2ef818..74f1fb36 100644 --- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt @@ -26,6 +26,9 @@ package com.atlarge.opendc.simulator.kernel import com.atlarge.opendc.simulator.Entity import com.atlarge.opendc.simulator.Instant +import com.atlarge.opendc.simulator.instrumentation.Instrument +import kotlinx.coroutines.experimental.channels.Channel +import kotlinx.coroutines.experimental.channels.ReceiveChannel /** * A message based discrete event simulation over some model `M`. This interface provides direct control over the @@ -51,6 +54,30 @@ interface Simulation { */ val , S> E.state: S + /** + * Install the given instrumentation device in this kernel to produce a stream of measurements of type + * T. + * + * The [ReceiveChannel] returned by this channel is by default conflated, which means the channel buffers at most + * one measurement, so that the receiver always gets the most recently sent element. + * Back-to-send sent measurements are conflated – only the the most recently sent element is received, while + * previously sent elements are lost. + * + * @param instrument The instrumentation device to install. + * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published. + */ + fun install(instrument: Instrument): ReceiveChannel = install(Channel.CONFLATED, instrument) + + /** + * Install the given instrumentation device in this kernel to produce a stream of measurements of type + * T. + * + * @param capacity The capacity of the buffer of the channel. + * @param instrument The instrumentation device to install. + * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published. + */ + fun install(capacity: Int = Channel.CONFLATED, instrument: Instrument): ReceiveChannel + /** * Step through one cycle in the simulation. This method will process all events in a single tick, update the * internal clock and then return the control to the user. -- cgit v1.2.3 From 699338a4e7b226ae7acf0f4aef1b0b28d90eb5b3 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 14 Feb 2018 12:12:57 +0100 Subject: feat(#12): Implement Instrumentation API in Omega kernel These changes implement the Instrumentation API described in issue #11 into the Omega simulation kernel. --- .../kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt | 2 +- .../src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'opendc-core') diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt index 68f76569..75f7ed60 100644 --- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt @@ -28,4 +28,4 @@ typealias Instrument = suspend InstrumentScope.() -> Unit * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -interface InstrumentScope: SendChannel, Context +interface InstrumentScope: SendChannel, Context diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt index 74f1fb36..ce6dc6a4 100644 --- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt @@ -38,7 +38,7 @@ import kotlinx.coroutines.experimental.channels.ReceiveChannel * @param M The shape of the model over which the simulation runs. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -interface Simulation { +interface Simulation { /** * The model in which the simulation runs. */ -- cgit v1.2.3 From 2c980f4627309b55a160dbe7fc17156201d9dde4 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 27 Feb 2018 10:17:46 +0100 Subject: feat(#12): Allow closing of instrument streams This change in Instrumentation API allows the user to close the data stream of an instrument by introducing a new concept: Port. A user can open a `Port` for a `Simulation` object and attach an arbitrary amount of instruments to this port. The data streams are closed by calling `Port#close()`. --- .../opendc/simulator/instrumentation/Instrument.kt | 10 ++--- .../opendc/simulator/instrumentation/Port.kt | 44 ++++++++++++++++++++++ .../atlarge/opendc/simulator/kernel/Simulation.kt | 26 ++----------- 3 files changed, 52 insertions(+), 28 deletions(-) create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt (limited to 'opendc-core') diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt index 75f7ed60..7633444e 100644 --- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt @@ -1,19 +1,17 @@ package com.atlarge.opendc.simulator.instrumentation -import com.atlarge.opendc.simulator.kernel.Kernel -import com.atlarge.opendc.simulator.Entity import com.atlarge.opendc.simulator.Context +import com.atlarge.opendc.simulator.Entity import kotlinx.coroutines.experimental.channels.ReceiveChannel import kotlinx.coroutines.experimental.channels.SendChannel -/** + /** * A kernel instrumentation device that allows the observation and measurement of properties of interest within some * model. * * An instrument is a [Process] that emits measurements from within some model in the form of a typed stream. An - * instrument is attached to a simulation using the [Kernel.install] method, which returns a [ReceiveChannel] from which - * the measurements can be extracted out of the simulation. - * + * instrument is attached to a simulation using the [Port.install] method, which returns a [ReceiveChannel] from + * which the measurements can be extracted out of the simulation. * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt new file mode 100644 index 00000000..caa11761 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt @@ -0,0 +1,44 @@ +package com.atlarge.opendc.simulator.instrumentation + +import com.atlarge.opendc.simulator.kernel.Simulation +import kotlinx.coroutines.experimental.channels.Channel +import kotlinx.coroutines.experimental.channels.ReceiveChannel + +/** + * A port allows users to install instrumentation devices to a [Simulation]. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +interface Port { + /** + * Install the given instrumentation device to produce a stream of measurements of type T. + * + * The [ReceiveChannel] returned by this channel is by default unlimited, which means the channel buffers at most + * one measurement, so that the receiver always gets the most recently sent element. + * Back-to-send sent measurements are conflated – only the the most recently sent element is received, while + * previously sent elements are lost. + * + * @param instrument The instrumentation device to install. + * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published. + */ + fun install(instrument: Instrument): ReceiveChannel = install(Channel.CONFLATED, instrument) + + /** + * Install the given instrumentation device to produce a stream of measurements of type code>T. + * + * @param capacity The capacity of the buffer of the channel. + * @param instrument The instrumentation device to install. + * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published. + */ + fun install(capacity: Int, instrument: Instrument): ReceiveChannel + + /** + * Close this port and stop the instruments from producing more measurements. + * This is an idempotent operation – repeated invocations of this function have no effect and return false. + * + * @param cause An optional cause that is thrown when trying to receive more elements from the installed + * instruments. + * @return `true` if the port was closed, `false` if it was already closed. + */ + fun close(cause: Throwable? = null): Boolean +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt index ce6dc6a4..0954868a 100644 --- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt @@ -27,8 +27,7 @@ package com.atlarge.opendc.simulator.kernel import com.atlarge.opendc.simulator.Entity import com.atlarge.opendc.simulator.Instant import com.atlarge.opendc.simulator.instrumentation.Instrument -import kotlinx.coroutines.experimental.channels.Channel -import kotlinx.coroutines.experimental.channels.ReceiveChannel +import com.atlarge.opendc.simulator.instrumentation.Port /** * A message based discrete event simulation over some model `M`. This interface provides direct control over the @@ -55,28 +54,11 @@ interface Simulation { val , S> E.state: S /** - * Install the given instrumentation device in this kernel to produce a stream of measurements of type - * T. + * Open a new [Port] to manage [Instrument]s. * - * The [ReceiveChannel] returned by this channel is by default conflated, which means the channel buffers at most - * one measurement, so that the receiver always gets the most recently sent element. - * Back-to-send sent measurements are conflated – only the the most recently sent element is received, while - * previously sent elements are lost. - * - * @param instrument The instrumentation device to install. - * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published. - */ - fun install(instrument: Instrument): ReceiveChannel = install(Channel.CONFLATED, instrument) - - /** - * Install the given instrumentation device in this kernel to produce a stream of measurements of type - * T. - * - * @param capacity The capacity of the buffer of the channel. - * @param instrument The instrumentation device to install. - * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published. + * @return A new [Port] instance to install [Instrument]s to. */ - fun install(capacity: Int = Channel.CONFLATED, instrument: Instrument): ReceiveChannel + fun openPort(): Port /** * Step through one cycle in the simulation. This method will process all events in a single tick, update the -- cgit v1.2.3 From 4ccf632ad4418114df0cd8460c7dd3a86c246f9d Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 22 Apr 2018 22:20:18 +0200 Subject: feat(#12): Buffer instrumentation measurements This change will make the simulator by default buffer all measurements of attached instruments to prevent strange situations where certain measurements are not recorded due to the processing running on another thread. --- .../kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) (limited to 'opendc-core') diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt index caa11761..880f9a15 100644 --- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt @@ -13,15 +13,13 @@ interface Port { /** * Install the given instrumentation device to produce a stream of measurements of type T. * - * The [ReceiveChannel] returned by this channel is by default unlimited, which means the channel buffers at most - * one measurement, so that the receiver always gets the most recently sent element. - * Back-to-send sent measurements are conflated – only the the most recently sent element is received, while - * previously sent elements are lost. + * The [ReceiveChannel] returned by this channel is by default backed by an unlimited buffer + * (using [Channel.UNLIMITED]), which may induce unnecessary overhead. * * @param instrument The instrumentation device to install. * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published. */ - fun install(instrument: Instrument): ReceiveChannel = install(Channel.CONFLATED, instrument) + fun install(instrument: Instrument): ReceiveChannel = install(Channel.UNLIMITED, instrument) /** * Install the given instrumentation device to produce a stream of measurements of type code>T. -- cgit v1.2.3