diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2018-02-27 10:17:46 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2018-04-22 22:08:11 +0200 |
| commit | 2c980f4627309b55a160dbe7fc17156201d9dde4 (patch) | |
| tree | e7fef0c75acc1753ac1ac55736cff926b3e16182 | |
| parent | 8bc180adfcbde4cd977474174b846b2fa9dfec5b (diff) | |
feat(#12): Allow closing of instrument streams
This change in Instrumentation API allows the user to close the data
stream of an instrument by introducing a new concept: Port. A user can
open a `Port` for a `Simulation` object and attach an arbitrary amount of
instruments to this port. The data streams are closed by calling
`Port#close()`.
7 files changed, 80 insertions, 47 deletions
diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt index 75f7ed60..7633444e 100644 --- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt @@ -1,19 +1,17 @@ package com.atlarge.opendc.simulator.instrumentation -import com.atlarge.opendc.simulator.kernel.Kernel -import com.atlarge.opendc.simulator.Entity import com.atlarge.opendc.simulator.Context +import com.atlarge.opendc.simulator.Entity import kotlinx.coroutines.experimental.channels.ReceiveChannel import kotlinx.coroutines.experimental.channels.SendChannel -/** + /** * A kernel instrumentation device that allows the observation and measurement of properties of interest within some * model. * * An instrument is a [Process] that emits measurements from within some model in the form of a typed stream. An - * instrument is attached to a simulation using the [Kernel.install] method, which returns a [ReceiveChannel] from which - * the measurements can be extracted out of the simulation. - * + * instrument is attached to a simulation using the [Port.install] method, which returns a [ReceiveChannel] from + * which the measurements can be extracted out of the simulation. * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt new file mode 100644 index 00000000..caa11761 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt @@ -0,0 +1,44 @@ +package com.atlarge.opendc.simulator.instrumentation + +import com.atlarge.opendc.simulator.kernel.Simulation +import kotlinx.coroutines.experimental.channels.Channel +import kotlinx.coroutines.experimental.channels.ReceiveChannel + +/** + * A port allows users to install instrumentation devices to a [Simulation]. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +interface Port<M> { + /** + * Install the given instrumentation device to produce a stream of measurements of type <code>T</code>. + * + * The [ReceiveChannel] returned by this channel is by default unlimited, which means the channel buffers at most + * one measurement, so that the receiver always gets the most recently sent element. + * Back-to-send sent measurements are conflated – only the the most recently sent element is received, while + * previously sent elements are lost. + * + * @param instrument The instrumentation device to install. + * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published. + */ + fun <T> install(instrument: Instrument<T, M>): ReceiveChannel<T> = install(Channel.CONFLATED, instrument) + + /** + * Install the given instrumentation device to produce a stream of measurements of type code>T</code>. + * + * @param capacity The capacity of the buffer of the channel. + * @param instrument The instrumentation device to install. + * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published. + */ + fun <T> install(capacity: Int, instrument: Instrument<T, M>): ReceiveChannel<T> + + /** + * Close this port and stop the instruments from producing more measurements. + * This is an idempotent operation – repeated invocations of this function have no effect and return false. + * + * @param cause An optional cause that is thrown when trying to receive more elements from the installed + * instruments. + * @return `true` if the port was closed, `false` if it was already closed. + */ + fun close(cause: Throwable? = null): Boolean +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt index ce6dc6a4..0954868a 100644 --- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt @@ -27,8 +27,7 @@ package com.atlarge.opendc.simulator.kernel import com.atlarge.opendc.simulator.Entity import com.atlarge.opendc.simulator.Instant import com.atlarge.opendc.simulator.instrumentation.Instrument -import kotlinx.coroutines.experimental.channels.Channel -import kotlinx.coroutines.experimental.channels.ReceiveChannel +import com.atlarge.opendc.simulator.instrumentation.Port /** * A message based discrete event simulation over some model `M`. This interface provides direct control over the @@ -55,28 +54,11 @@ interface Simulation<M> { val <E : Entity<S, *>, S> E.state: S /** - * Install the given instrumentation device in this kernel to produce a stream of measurements of type - * <code>T</code>. + * Open a new [Port] to manage [Instrument]s. * - * The [ReceiveChannel] returned by this channel is by default conflated, which means the channel buffers at most - * one measurement, so that the receiver always gets the most recently sent element. - * Back-to-send sent measurements are conflated – only the the most recently sent element is received, while - * previously sent elements are lost. - * - * @param instrument The instrumentation device to install. - * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published. - */ - fun <T> install(instrument: Instrument<T, M>): ReceiveChannel<T> = install(Channel.CONFLATED, instrument) - - /** - * Install the given instrumentation device in this kernel to produce a stream of measurements of type - * <code>T</code>. - * - * @param capacity The capacity of the buffer of the channel. - * @param instrument The instrumentation device to install. - * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published. + * @return A new [Port] instance to install [Instrument]s to. */ - fun <T> install(capacity: Int = Channel.CONFLATED, instrument: Instrument<T, M>): ReceiveChannel<T> + fun openPort(): Port<M> /** * Step through one cycle in the simulation. This method will process all events in a single tick, update the diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt index 11217e8d..4e3aa16c 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -27,11 +27,13 @@ package com.atlarge.opendc.omega import com.atlarge.opendc.simulator.* import com.atlarge.opendc.simulator.instrumentation.Instrument import com.atlarge.opendc.simulator.instrumentation.InstrumentScope +import com.atlarge.opendc.simulator.instrumentation.Port import com.atlarge.opendc.simulator.kernel.Simulation import kotlinx.coroutines.experimental.channels.Channel import kotlinx.coroutines.experimental.channels.ReceiveChannel import kotlinx.coroutines.experimental.channels.SendChannel import mu.KotlinLogging +import java.lang.ref.WeakReference import java.util.* import kotlin.coroutines.experimental.* @@ -132,22 +134,31 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot schedule(prepare(message, destination, sender, delay)) // Simulation implementation - override fun <T> install(capacity: Int, instrument: Instrument<T, M>): ReceiveChannel<T> { - val channel = Channel<T>(capacity) - val process = object : Process<Unit, M> { - override val initialState = Unit - override suspend fun Context<Unit, M>.run() { - val builder = object : InstrumentScope<T, M>, SendChannel<T> by channel, Context<Unit, M> by this {} - try { - instrument(builder) - channel.close() - } catch (cause: Throwable) { - channel.close(cause) + override fun openPort(): Port<M> = object : Port<M> { + val channels: MutableSet<WeakReference<Channel<*>>> = mutableSetOf() + + override fun <T> install(capacity: Int, instrument: Instrument<T, M>): ReceiveChannel<T> { + val channel = Channel<T>(capacity) + val process = object : Process<Unit, M> { + override val initialState = Unit + override suspend fun Context<Unit, M>.run() { + val builder = object : InstrumentScope<T, M>, SendChannel<T> by channel, Context<Unit, M> by this {} + try { + instrument(builder) + channel.close() + } catch (cause: Throwable) { + channel.close(cause) + } } } + channels.add(WeakReference(channel)) + register(process) + return channel } - register(process) - return channel + + override fun close(cause: Throwable?): Boolean = channels + .map { it.get()?.close(cause) ?: false } + .any() } override fun step() { diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt index 42ca05ec..74d6e2de 100644 --- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt +++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt @@ -28,11 +28,10 @@ import com.atlarge.opendc.simulator.Bootstrap import com.atlarge.opendc.simulator.Context import com.atlarge.opendc.simulator.Process import com.atlarge.opendc.simulator.instrumentation.Instrument -import com.atlarge.opendc.simulator.kernel.Kernel import com.atlarge.opendc.simulator.kernel.Simulation import kotlinx.coroutines.experimental.Unconfined import kotlinx.coroutines.experimental.async -import kotlinx.coroutines.experimental.channels.* +import kotlinx.coroutines.experimental.channels.consumeEach import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test @@ -159,12 +158,11 @@ internal class SmokeTest { } val simulation: Simulation<Unit> = OmegaKernel.create(Bootstrap.create { Unit }) - val stream = simulation.install(instrument) + val stream = simulation.openPort().install(instrument) val res = async(Unconfined) { stream.consumeEach { println(it) } } - simulation.run(100) } } diff --git a/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt b/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt index 13f322bc..ab701259 100644 --- a/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt +++ b/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt @@ -45,7 +45,7 @@ fun main(args: Array<String>) { properties["javax.persistence.jdbc.password"] = env["PERSISTENCE_PASSWORD"] ?: "" val factory = Persistence.createEntityManagerFactory("opendc-simulator", properties) - val timeout = 10000L + val timeout = 30000L val threads = 4 val executorService = Executors.newFixedThreadPool(threads) val experiments = JpaExperimentManager(factory) diff --git a/opendc-model-odc/setup/src/main/resources/META-INF/persistence.xml b/opendc-model-odc/setup/src/main/resources/META-INF/persistence.xml index 4c4e6ac7..a091bdbd 100644 --- a/opendc-model-odc/setup/src/main/resources/META-INF/persistence.xml +++ b/opendc-model-odc/setup/src/main/resources/META-INF/persistence.xml @@ -35,7 +35,7 @@ <property name="hibernate.dialect" value="org.hibernate.dialect.MySQLDialect"/> <property name="hibernate.show_sql" value="false"/> <property name="hibernate.hbm2ddl.auto" value="validate"/> - <property name="hibernate.jdbc.batch_size" value="50"/> + <property name="hibernate.jdbc.batch_size" value="100"/> </properties> </persistence-unit> </persistence> |
