From a64ae13a6c5aef435e048b13bb3d7bad449f783b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 14 Feb 2018 12:09:52 +0100 Subject: feat(#11): Create Instrumentation API This commit creates the interfaces for the new Instrumentation API described in issue #11. This interface allows users to plug an instrumentation device into a (live) simulation in order extract measurements from the simulation. --- opendc-core/build.gradle | 1 + .../opendc/simulator/instrumentation/Instrument.kt | 31 ++++++++++++++++++++++ .../atlarge/opendc/simulator/kernel/Simulation.kt | 27 +++++++++++++++++++ 3 files changed, 59 insertions(+) create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt diff --git a/opendc-core/build.gradle b/opendc-core/build.gradle index 223e73f9..210b8820 100644 --- a/opendc-core/build.gradle +++ b/opendc-core/build.gradle @@ -77,6 +77,7 @@ repositories { dependencies { compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version" + compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:0.22.2" testCompile "org.junit.jupiter:junit-jupiter-api:5.0.0-RC3" testRuntime "org.junit.jupiter:junit-jupiter-engine:5.0.0-RC3" diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt new file mode 100644 index 00000000..68f76569 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt @@ -0,0 +1,31 @@ +package com.atlarge.opendc.simulator.instrumentation + +import com.atlarge.opendc.simulator.kernel.Kernel +import com.atlarge.opendc.simulator.Entity +import com.atlarge.opendc.simulator.Context +import kotlinx.coroutines.experimental.channels.ReceiveChannel +import kotlinx.coroutines.experimental.channels.SendChannel + +/** + * A kernel instrumentation device that allows the observation and measurement of properties of interest within some + * model. + * + * An instrument is a [Process] that emits measurements from within some model in the form of a typed stream. An + * instrument is attached to a simulation using the [Kernel.install] method, which returns a [ReceiveChannel] from which + * the measurements can be extracted out of the simulation. + * + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +typealias Instrument = suspend InstrumentScope.() -> Unit + +/** + * This interface defines the scope in which an instrumentation device is built. + * + * An instrument is a [Process] without any observable state that is allowed to send messages to other [Entity] + * instances in the simulation. In addition, the instrument can emit measurements using the methods provided by the + * [SendChannel] interface. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +interface InstrumentScope: SendChannel, Context diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt index bb2ef818..74f1fb36 100644 --- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt @@ -26,6 +26,9 @@ package com.atlarge.opendc.simulator.kernel import com.atlarge.opendc.simulator.Entity import com.atlarge.opendc.simulator.Instant +import com.atlarge.opendc.simulator.instrumentation.Instrument +import kotlinx.coroutines.experimental.channels.Channel +import kotlinx.coroutines.experimental.channels.ReceiveChannel /** * A message based discrete event simulation over some model `M`. This interface provides direct control over the @@ -51,6 +54,30 @@ interface Simulation { */ val , S> E.state: S + /** + * Install the given instrumentation device in this kernel to produce a stream of measurements of type + * T. + * + * The [ReceiveChannel] returned by this channel is by default conflated, which means the channel buffers at most + * one measurement, so that the receiver always gets the most recently sent element. + * Back-to-send sent measurements are conflated – only the the most recently sent element is received, while + * previously sent elements are lost. + * + * @param instrument The instrumentation device to install. + * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published. + */ + fun install(instrument: Instrument): ReceiveChannel = install(Channel.CONFLATED, instrument) + + /** + * Install the given instrumentation device in this kernel to produce a stream of measurements of type + * T. + * + * @param capacity The capacity of the buffer of the channel. + * @param instrument The instrumentation device to install. + * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published. + */ + fun install(capacity: Int = Channel.CONFLATED, instrument: Instrument): ReceiveChannel + /** * Step through one cycle in the simulation. This method will process all events in a single tick, update the * internal clock and then return the control to the user. -- cgit v1.2.3 From 699338a4e7b226ae7acf0f4aef1b0b28d90eb5b3 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 14 Feb 2018 12:12:57 +0100 Subject: feat(#12): Implement Instrumentation API in Omega kernel These changes implement the Instrumentation API described in issue #11 into the Omega simulation kernel. --- .../opendc/simulator/instrumentation/Instrument.kt | 2 +- .../atlarge/opendc/simulator/kernel/Simulation.kt | 2 +- .../com/atlarge/opendc/omega/OmegaSimulation.kt | 23 ++++++++++++++ .../kotlin/com/atlarge/opendc/omega/SmokeTest.kt | 35 +++++++++++++++++++--- 4 files changed, 56 insertions(+), 6 deletions(-) diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt index 68f76569..75f7ed60 100644 --- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt @@ -28,4 +28,4 @@ typealias Instrument = suspend InstrumentScope.() -> Unit * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -interface InstrumentScope: SendChannel, Context +interface InstrumentScope: SendChannel, Context diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt index 74f1fb36..ce6dc6a4 100644 --- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt @@ -38,7 +38,7 @@ import kotlinx.coroutines.experimental.channels.ReceiveChannel * @param M The shape of the model over which the simulation runs. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -interface Simulation { +interface Simulation { /** * The model in which the simulation runs. */ diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt index 532a033a..11217e8d 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -25,7 +25,12 @@ package com.atlarge.opendc.omega import com.atlarge.opendc.simulator.* +import com.atlarge.opendc.simulator.instrumentation.Instrument +import com.atlarge.opendc.simulator.instrumentation.InstrumentScope import com.atlarge.opendc.simulator.kernel.Simulation +import kotlinx.coroutines.experimental.channels.Channel +import kotlinx.coroutines.experimental.channels.ReceiveChannel +import kotlinx.coroutines.experimental.channels.SendChannel import mu.KotlinLogging import java.util.* import kotlin.coroutines.experimental.* @@ -127,6 +132,24 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot schedule(prepare(message, destination, sender, delay)) // Simulation implementation + override fun install(capacity: Int, instrument: Instrument): ReceiveChannel { + val channel = Channel(capacity) + val process = object : Process { + override val initialState = Unit + override suspend fun Context.run() { + val builder = object : InstrumentScope, SendChannel by channel, Context by this {} + try { + instrument(builder) + channel.close() + } catch (cause: Throwable) { + channel.close(cause) + } + } + } + register(process) + return channel + } + override fun step() { while (true) { val envelope = queue.peek() ?: return diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt index c47f9a26..42ca05ec 100644 --- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt +++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt @@ -27,8 +27,13 @@ package com.atlarge.opendc.omega import com.atlarge.opendc.simulator.Bootstrap import com.atlarge.opendc.simulator.Context import com.atlarge.opendc.simulator.Process +import com.atlarge.opendc.simulator.instrumentation.Instrument +import com.atlarge.opendc.simulator.kernel.Kernel +import com.atlarge.opendc.simulator.kernel.Simulation +import kotlinx.coroutines.experimental.Unconfined +import kotlinx.coroutines.experimental.async +import kotlinx.coroutines.experimental.channels.* import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test /** @@ -123,6 +128,7 @@ internal class SmokeTest { hold(10) } } + /** * Test if the kernel allows access to the simulation model object. */ @@ -135,9 +141,30 @@ internal class SmokeTest { value } - val simulation = OmegaKernel.create(bootstrap) - simulation.run(5) + val kernel = OmegaKernel.create(bootstrap) + kernel.run(5) + } + + + @Test + fun `instrumentation works`() { + val instrument: Instrument = { + var value = 0 + + for (i in 1..10) { + send(value) + value += 10 + hold(20) + } + } + + val simulation: Simulation = OmegaKernel.create(Bootstrap.create { Unit }) + val stream = simulation.install(instrument) + + val res = async(Unconfined) { + stream.consumeEach { println(it) } + } - assertTrue(simulation.run { process.state }) + simulation.run(100) } } -- cgit v1.2.3 From 8bc180adfcbde4cd977474174b846b2fa9dfec5b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 20 Feb 2018 00:16:28 +0100 Subject: feat(#12): Add support for measurement interpolation This change adds interpolation functionality to the standard library for instrumentation devices. --- .../simulator/instrumentation/Interpolation.kt | 124 +++++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt diff --git a/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt b/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt new file mode 100644 index 00000000..439813e8 --- /dev/null +++ b/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt @@ -0,0 +1,124 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.simulator.instrumentation + +import kotlinx.coroutines.experimental.Unconfined +import kotlinx.coroutines.experimental.channels.ReceiveChannel +import kotlinx.coroutines.experimental.channels.consume +import kotlinx.coroutines.experimental.channels.produce +import kotlin.coroutines.experimental.CoroutineContext + +/** + * Interpolate [n] amount of elements between every two occurrences of elements passing through the channel. + * + * The operation is _intermediate_ and _stateless_. + * This function [consumes][consume] all elements of the original [ReceiveChannel]. + * + * @param context The context of the coroutine. + * @param n The amount of elements to interpolate between the actual elements in the channel. + * @param interpolator A function to interpolate between the two element occurrences. + */ +fun ReceiveChannel.interpolate(n: Int, context: CoroutineContext = Unconfined, + interpolator: (Double, E, E) -> E): ReceiveChannel = + produce(context) { + consume { + val iterator = iterator() + + if (!iterator.hasNext()) + return@produce + + var a = iterator.next() + send(a) + + while (iterator.hasNext()) { + val b = iterator.next() + for (i in 1..n) { + send(interpolator(i.toDouble() / (n + 1), a, b)) + } + send(b) + a = b + } + } + } + +/** + * Perform a linear interpolation on the given double values. + * + * @param a The start value + * @param b The end value + * @param f The amount to interpolate which represents the position between the two values as a percentage in [0, 1]. + * @return The interpolated double result between the double values. + */ +fun lerp(a: Double, b: Double, f: Double): Double = a + f * (b - a) + +/** + * Perform a linear interpolation on the given float values. + * + * @param a The start value + * @param b The end value + * @param f The amount to interpolate which represents the position between the two values as a percentage in [0, 1]. + * @return The interpolated float result between the float values. + */ +fun lerp(a: Float, b: Float, f: Float): Float = a + f * (b - a) + +/** + * Perform a linear interpolation on the given integer values. + * + * @param a The start value + * @param b The end value + * @param f The amount to interpolate which represents the position between the two values as a percentage in [0, 1]. + * @return The interpolated integer result between the integer values. + */ +fun lerp(a: Int, b: Int, f: Float): Int = lerp(a.toFloat(), b.toFloat(), f).toInt() + +/** + * Perform a linear interpolation on the given integer values. + * + * @param a The start value + * @param b The end value + * @param f The amount to interpolate which represents the position between the two values as a percentage in [0, 1]. + * @return The interpolated integer result between the integer values. + */ +fun lerp(a: Int, b: Int, f: Double): Int = lerp(a.toDouble(), b.toDouble(), f).toInt() + +/** + * Perform a linear interpolation on the given long values. + * + * @param a The start value + * @param b The end value + * @param f The amount to interpolate which represents the position between the two values as a percentage in [0, 1]. + * @return The interpolated long result between the long values. + */ +fun lerp(a: Long, b: Long, f: Double): Long = lerp(a.toDouble(), b.toDouble(), f).toLong() + +/** + * Perform a linear interpolation on the given long values. + * + * @param a The start value + * @param b The end value + * @param f The amount to interpolate which represents the position between the two values as a percentage in [0, 1]. + * @return The interpolated long result between the long values. + */ +fun lerp(a: Long, b: Long, f: Float): Long = lerp(a.toFloat(), b.toFloat(), f).toLong() -- cgit v1.2.3 From 2c980f4627309b55a160dbe7fc17156201d9dde4 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 27 Feb 2018 10:17:46 +0100 Subject: feat(#12): Allow closing of instrument streams This change in Instrumentation API allows the user to close the data stream of an instrument by introducing a new concept: Port. A user can open a `Port` for a `Simulation` object and attach an arbitrary amount of instruments to this port. The data streams are closed by calling `Port#close()`. --- .../opendc/simulator/instrumentation/Instrument.kt | 10 ++--- .../opendc/simulator/instrumentation/Port.kt | 44 ++++++++++++++++++++++ .../atlarge/opendc/simulator/kernel/Simulation.kt | 26 ++----------- .../com/atlarge/opendc/omega/OmegaSimulation.kt | 37 +++++++++++------- .../kotlin/com/atlarge/opendc/omega/SmokeTest.kt | 6 +-- .../opendc/model/odc/platform/JpaPlatformRunner.kt | 2 +- .../src/main/resources/META-INF/persistence.xml | 2 +- 7 files changed, 80 insertions(+), 47 deletions(-) create mode 100644 opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt index 75f7ed60..7633444e 100644 --- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt @@ -1,19 +1,17 @@ package com.atlarge.opendc.simulator.instrumentation -import com.atlarge.opendc.simulator.kernel.Kernel -import com.atlarge.opendc.simulator.Entity import com.atlarge.opendc.simulator.Context +import com.atlarge.opendc.simulator.Entity import kotlinx.coroutines.experimental.channels.ReceiveChannel import kotlinx.coroutines.experimental.channels.SendChannel -/** + /** * A kernel instrumentation device that allows the observation and measurement of properties of interest within some * model. * * An instrument is a [Process] that emits measurements from within some model in the form of a typed stream. An - * instrument is attached to a simulation using the [Kernel.install] method, which returns a [ReceiveChannel] from which - * the measurements can be extracted out of the simulation. - * + * instrument is attached to a simulation using the [Port.install] method, which returns a [ReceiveChannel] from + * which the measurements can be extracted out of the simulation. * * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt new file mode 100644 index 00000000..caa11761 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt @@ -0,0 +1,44 @@ +package com.atlarge.opendc.simulator.instrumentation + +import com.atlarge.opendc.simulator.kernel.Simulation +import kotlinx.coroutines.experimental.channels.Channel +import kotlinx.coroutines.experimental.channels.ReceiveChannel + +/** + * A port allows users to install instrumentation devices to a [Simulation]. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +interface Port { + /** + * Install the given instrumentation device to produce a stream of measurements of type T. + * + * The [ReceiveChannel] returned by this channel is by default unlimited, which means the channel buffers at most + * one measurement, so that the receiver always gets the most recently sent element. + * Back-to-send sent measurements are conflated – only the the most recently sent element is received, while + * previously sent elements are lost. + * + * @param instrument The instrumentation device to install. + * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published. + */ + fun install(instrument: Instrument): ReceiveChannel = install(Channel.CONFLATED, instrument) + + /** + * Install the given instrumentation device to produce a stream of measurements of type code>T. + * + * @param capacity The capacity of the buffer of the channel. + * @param instrument The instrumentation device to install. + * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published. + */ + fun install(capacity: Int, instrument: Instrument): ReceiveChannel + + /** + * Close this port and stop the instruments from producing more measurements. + * This is an idempotent operation – repeated invocations of this function have no effect and return false. + * + * @param cause An optional cause that is thrown when trying to receive more elements from the installed + * instruments. + * @return `true` if the port was closed, `false` if it was already closed. + */ + fun close(cause: Throwable? = null): Boolean +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt index ce6dc6a4..0954868a 100644 --- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt @@ -27,8 +27,7 @@ package com.atlarge.opendc.simulator.kernel import com.atlarge.opendc.simulator.Entity import com.atlarge.opendc.simulator.Instant import com.atlarge.opendc.simulator.instrumentation.Instrument -import kotlinx.coroutines.experimental.channels.Channel -import kotlinx.coroutines.experimental.channels.ReceiveChannel +import com.atlarge.opendc.simulator.instrumentation.Port /** * A message based discrete event simulation over some model `M`. This interface provides direct control over the @@ -55,28 +54,11 @@ interface Simulation { val , S> E.state: S /** - * Install the given instrumentation device in this kernel to produce a stream of measurements of type - * T. + * Open a new [Port] to manage [Instrument]s. * - * The [ReceiveChannel] returned by this channel is by default conflated, which means the channel buffers at most - * one measurement, so that the receiver always gets the most recently sent element. - * Back-to-send sent measurements are conflated – only the the most recently sent element is received, while - * previously sent elements are lost. - * - * @param instrument The instrumentation device to install. - * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published. - */ - fun install(instrument: Instrument): ReceiveChannel = install(Channel.CONFLATED, instrument) - - /** - * Install the given instrumentation device in this kernel to produce a stream of measurements of type - * T. - * - * @param capacity The capacity of the buffer of the channel. - * @param instrument The instrumentation device to install. - * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published. + * @return A new [Port] instance to install [Instrument]s to. */ - fun install(capacity: Int = Channel.CONFLATED, instrument: Instrument): ReceiveChannel + fun openPort(): Port /** * Step through one cycle in the simulation. This method will process all events in a single tick, update the diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt index 11217e8d..4e3aa16c 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -27,11 +27,13 @@ package com.atlarge.opendc.omega import com.atlarge.opendc.simulator.* import com.atlarge.opendc.simulator.instrumentation.Instrument import com.atlarge.opendc.simulator.instrumentation.InstrumentScope +import com.atlarge.opendc.simulator.instrumentation.Port import com.atlarge.opendc.simulator.kernel.Simulation import kotlinx.coroutines.experimental.channels.Channel import kotlinx.coroutines.experimental.channels.ReceiveChannel import kotlinx.coroutines.experimental.channels.SendChannel import mu.KotlinLogging +import java.lang.ref.WeakReference import java.util.* import kotlin.coroutines.experimental.* @@ -132,22 +134,31 @@ internal class OmegaSimulation(bootstrap: Bootstrap) : Simulation, Boot schedule(prepare(message, destination, sender, delay)) // Simulation implementation - override fun install(capacity: Int, instrument: Instrument): ReceiveChannel { - val channel = Channel(capacity) - val process = object : Process { - override val initialState = Unit - override suspend fun Context.run() { - val builder = object : InstrumentScope, SendChannel by channel, Context by this {} - try { - instrument(builder) - channel.close() - } catch (cause: Throwable) { - channel.close(cause) + override fun openPort(): Port = object : Port { + val channels: MutableSet>> = mutableSetOf() + + override fun install(capacity: Int, instrument: Instrument): ReceiveChannel { + val channel = Channel(capacity) + val process = object : Process { + override val initialState = Unit + override suspend fun Context.run() { + val builder = object : InstrumentScope, SendChannel by channel, Context by this {} + try { + instrument(builder) + channel.close() + } catch (cause: Throwable) { + channel.close(cause) + } } } + channels.add(WeakReference(channel)) + register(process) + return channel } - register(process) - return channel + + override fun close(cause: Throwable?): Boolean = channels + .map { it.get()?.close(cause) ?: false } + .any() } override fun step() { diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt index 42ca05ec..74d6e2de 100644 --- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt +++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt @@ -28,11 +28,10 @@ import com.atlarge.opendc.simulator.Bootstrap import com.atlarge.opendc.simulator.Context import com.atlarge.opendc.simulator.Process import com.atlarge.opendc.simulator.instrumentation.Instrument -import com.atlarge.opendc.simulator.kernel.Kernel import com.atlarge.opendc.simulator.kernel.Simulation import kotlinx.coroutines.experimental.Unconfined import kotlinx.coroutines.experimental.async -import kotlinx.coroutines.experimental.channels.* +import kotlinx.coroutines.experimental.channels.consumeEach import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test @@ -159,12 +158,11 @@ internal class SmokeTest { } val simulation: Simulation = OmegaKernel.create(Bootstrap.create { Unit }) - val stream = simulation.install(instrument) + val stream = simulation.openPort().install(instrument) val res = async(Unconfined) { stream.consumeEach { println(it) } } - simulation.run(100) } } diff --git a/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt b/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt index 13f322bc..ab701259 100644 --- a/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt +++ b/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt @@ -45,7 +45,7 @@ fun main(args: Array) { properties["javax.persistence.jdbc.password"] = env["PERSISTENCE_PASSWORD"] ?: "" val factory = Persistence.createEntityManagerFactory("opendc-simulator", properties) - val timeout = 10000L + val timeout = 30000L val threads = 4 val executorService = Executors.newFixedThreadPool(threads) val experiments = JpaExperimentManager(factory) diff --git a/opendc-model-odc/setup/src/main/resources/META-INF/persistence.xml b/opendc-model-odc/setup/src/main/resources/META-INF/persistence.xml index 4c4e6ac7..a091bdbd 100644 --- a/opendc-model-odc/setup/src/main/resources/META-INF/persistence.xml +++ b/opendc-model-odc/setup/src/main/resources/META-INF/persistence.xml @@ -35,7 +35,7 @@ - + -- cgit v1.2.3 From 4ccf632ad4418114df0cd8460c7dd3a86c246f9d Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Sun, 22 Apr 2018 22:20:18 +0200 Subject: feat(#12): Buffer instrumentation measurements This change will make the simulator by default buffer all measurements of attached instruments to prevent strange situations where certain measurements are not recorded due to the processing running on another thread. --- .../kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt index caa11761..880f9a15 100644 --- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt @@ -13,15 +13,13 @@ interface Port { /** * Install the given instrumentation device to produce a stream of measurements of type T. * - * The [ReceiveChannel] returned by this channel is by default unlimited, which means the channel buffers at most - * one measurement, so that the receiver always gets the most recently sent element. - * Back-to-send sent measurements are conflated – only the the most recently sent element is received, while - * previously sent elements are lost. + * The [ReceiveChannel] returned by this channel is by default backed by an unlimited buffer + * (using [Channel.UNLIMITED]), which may induce unnecessary overhead. * * @param instrument The instrumentation device to install. * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published. */ - fun install(instrument: Instrument): ReceiveChannel = install(Channel.CONFLATED, instrument) + fun install(instrument: Instrument): ReceiveChannel = install(Channel.UNLIMITED, instrument) /** * Install the given instrumentation device to produce a stream of measurements of type code>T. -- cgit v1.2.3