From 2c980f4627309b55a160dbe7fc17156201d9dde4 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 27 Feb 2018 10:17:46 +0100 Subject: feat(#12): Allow closing of instrument streams This change in Instrumentation API allows the user to close the data stream of an instrument by introducing a new concept: Port. A user can open a `Port` for a `Simulation` object and attach an arbitrary amount of instruments to this port. The data streams are closed by calling `Port#close()`. --- .../opendc/simulator/instrumentation/Instrument.kt | 10 ++--- .../opendc/simulator/instrumentation/Port.kt | 44 ++++++++++++++++++++++ .../atlarge/opendc/simulator/kernel/Simulation.kt | 26 ++----------- 3 files changed, 52 insertions(+), 28 deletions(-) create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt (limited to 'opendc-core') diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt index 75f7ed60..7633444e 100644 --- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt @@ -1,19 +1,17 @@ package com.atlarge.opendc.simulator.instrumentation -import com.atlarge.opendc.simulator.kernel.Kernel -import com.atlarge.opendc.simulator.Entity import com.atlarge.opendc.simulator.Context +import com.atlarge.opendc.simulator.Entity import kotlinx.coroutines.experimental.channels.ReceiveChannel import kotlinx.coroutines.experimental.channels.SendChannel -/** + /** * A kernel instrumentation device that allows the observation and measurement of properties of interest within some * model. * * An instrument is a [Process] that emits measurements from within some model in the form of a typed stream. An - * instrument is attached to a simulation using the [Kernel.install] method, which returns a [ReceiveChannel] from which - * the measurements can be extracted out of the simulation. - * + * instrument is attached to a simulation using the [Port.install] method, which returns a [ReceiveChannel] from + * which the measurements can be extracted out of the simulation. * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt new file mode 100644 index 00000000..caa11761 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt @@ -0,0 +1,44 @@ +package com.atlarge.opendc.simulator.instrumentation + +import com.atlarge.opendc.simulator.kernel.Simulation +import kotlinx.coroutines.experimental.channels.Channel +import kotlinx.coroutines.experimental.channels.ReceiveChannel + +/** + * A port allows users to install instrumentation devices to a [Simulation]. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +interface Port { + /** + * Install the given instrumentation device to produce a stream of measurements of type T. + * + * The [ReceiveChannel] returned by this channel is by default unlimited, which means the channel buffers at most + * one measurement, so that the receiver always gets the most recently sent element. + * Back-to-send sent measurements are conflated – only the the most recently sent element is received, while + * previously sent elements are lost. + * + * @param instrument The instrumentation device to install. + * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published. + */ + fun install(instrument: Instrument): ReceiveChannel = install(Channel.CONFLATED, instrument) + + /** + * Install the given instrumentation device to produce a stream of measurements of type code>T. + * + * @param capacity The capacity of the buffer of the channel. + * @param instrument The instrumentation device to install. + * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published. + */ + fun install(capacity: Int, instrument: Instrument): ReceiveChannel + + /** + * Close this port and stop the instruments from producing more measurements. + * This is an idempotent operation – repeated invocations of this function have no effect and return false. + * + * @param cause An optional cause that is thrown when trying to receive more elements from the installed + * instruments. + * @return `true` if the port was closed, `false` if it was already closed. + */ + fun close(cause: Throwable? = null): Boolean +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt index ce6dc6a4..0954868a 100644 --- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt @@ -27,8 +27,7 @@ package com.atlarge.opendc.simulator.kernel import com.atlarge.opendc.simulator.Entity import com.atlarge.opendc.simulator.Instant import com.atlarge.opendc.simulator.instrumentation.Instrument -import kotlinx.coroutines.experimental.channels.Channel -import kotlinx.coroutines.experimental.channels.ReceiveChannel +import com.atlarge.opendc.simulator.instrumentation.Port /** * A message based discrete event simulation over some model `M`. This interface provides direct control over the @@ -55,28 +54,11 @@ interface Simulation { val , S> E.state: S /** - * Install the given instrumentation device in this kernel to produce a stream of measurements of type - * T. + * Open a new [Port] to manage [Instrument]s. * - * The [ReceiveChannel] returned by this channel is by default conflated, which means the channel buffers at most - * one measurement, so that the receiver always gets the most recently sent element. - * Back-to-send sent measurements are conflated – only the the most recently sent element is received, while - * previously sent elements are lost. - * - * @param instrument The instrumentation device to install. - * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published. - */ - fun install(instrument: Instrument): ReceiveChannel = install(Channel.CONFLATED, instrument) - - /** - * Install the given instrumentation device in this kernel to produce a stream of measurements of type - * T. - * - * @param capacity The capacity of the buffer of the channel. - * @param instrument The instrumentation device to install. - * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published. + * @return A new [Port] instance to install [Instrument]s to. */ - fun install(capacity: Int = Channel.CONFLATED, instrument: Instrument): ReceiveChannel + fun openPort(): Port /** * Step through one cycle in the simulation. This method will process all events in a single tick, update the -- cgit v1.2.3