diff options
| author | Fabian Mastenbroek <fabianishere@outlook.com> | 2018-04-22 22:27:13 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2018-04-22 22:27:13 +0200 |
| commit | 07f245dcf4b01ade251d0f4bedc897d7145b04d1 (patch) | |
| tree | a7b4c49df918e812998074f3ff71b6ba1868d645 | |
| parent | f691a72b12a43fa15c1617966450c55206664797 (diff) | |
| parent | 4ccf632ad4418114df0cd8460c7dd3a86c246f9d (diff) | |
feat(#12): Implement Instrumentation API
These changes contain the specification of the new Instrumentation API for the simulator, in addition to the implementation for the Omega kernel. As an example, the API allows users to measure data from processes in simulation and interpolate data points between the measurements.
Closes #11, #12
9 files changed, 271 insertions, 7 deletions
diff --git a/opendc-core/build.gradle b/opendc-core/build.gradle index 223e73f9..210b8820 100644 --- a/opendc-core/build.gradle +++ b/opendc-core/build.gradle @@ -77,6 +77,7 @@ repositories { dependencies { compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version" + compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:0.22.2" testCompile "org.junit.jupiter:junit-jupiter-api:5.0.0-RC3" testRuntime "org.junit.jupiter:junit-jupiter-engine:5.0.0-RC3" diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt new file mode 100644 index 00000000..7633444e --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt @@ -0,0 +1,29 @@ +package com.atlarge.opendc.simulator.instrumentation + +import com.atlarge.opendc.simulator.Context +import com.atlarge.opendc.simulator.Entity +import kotlinx.coroutines.experimental.channels.ReceiveChannel +import kotlinx.coroutines.experimental.channels.SendChannel + + /** + * A kernel instrumentation device that allows the observation and measurement of properties of interest within some + * model. + * + * An instrument is a [Process] that emits measurements from within some model in the form of a typed stream. An + * instrument is attached to a simulation using the [Port.install] method, which returns a [ReceiveChannel] from + * which the measurements can be extracted out of the simulation. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +typealias Instrument<T, M> = suspend InstrumentScope<T, M>.() -> Unit + +/** + * This interface defines the scope in which an instrumentation device is built. + * + * An instrument is a [Process] without any observable state that is allowed to send messages to other [Entity] + * instances in the simulation. In addition, the instrument can emit measurements using the methods provided by the + * [SendChannel] interface. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +interface InstrumentScope<in T, M>: SendChannel<T>, Context<Unit, M> diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt new file mode 100644 index 00000000..880f9a15 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt @@ -0,0 +1,42 @@ +package com.atlarge.opendc.simulator.instrumentation + +import com.atlarge.opendc.simulator.kernel.Simulation +import kotlinx.coroutines.experimental.channels.Channel +import kotlinx.coroutines.experimental.channels.ReceiveChannel + +/** + * A port allows users to install instrumentation devices to a [Simulation]. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +interface Port<M> { + /** + * Install the given instrumentation device to produce a stream of measurements of type <code>T</code>. + * + * The [ReceiveChannel] returned by this channel is by default backed by an unlimited buffer + * (using [Channel.UNLIMITED]), which may induce unnecessary overhead. + * + * @param instrument The instrumentation device to install. + * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published. + */ + fun <T> install(instrument: Instrument<T, M>): ReceiveChannel<T> = install(Channel.UNLIMITED, instrument) + + /** + * Install the given instrumentation device to produce a stream of measurements of type code>T</code>. + * + * @param capacity The capacity of the buffer of the channel. + * @param instrument The instrumentation device to install. + * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published. + */ + fun <T> install(capacity: Int, instrument: Instrument<T, M>): ReceiveChannel<T> + + /** + * Close this port and stop the instruments from producing more measurements. + * This is an idempotent operation – repeated invocations of this function have no effect and return false. + * + * @param cause An optional cause that is thrown when trying to receive more elements from the installed + * instruments. + * @return `true` if the port was closed, `false` if it was already closed. + */ + fun close(cause: Throwable? = null): Boolean +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt index bb2ef818..0954868a 100644 --- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt @@ -26,6 +26,8 @@ package com.atlarge.opendc.simulator.kernel import com.atlarge.opendc.simulator.Entity import com.atlarge.opendc.simulator.Instant +import com.atlarge.opendc.simulator.instrumentation.Instrument +import com.atlarge.opendc.simulator.instrumentation.Port /** * A message based discrete event simulation over some model `M`. This interface provides direct control over the @@ -35,7 +37,7 @@ import com.atlarge.opendc.simulator.Instant * @param M The shape of the model over which the simulation runs. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -interface Simulation<out M> { +interface Simulation<M> { /** * The model in which the simulation runs. */ @@ -52,6 +54,13 @@ interface Simulation<out M> { val <E : Entity<S, *>, S> E.state: S /** + * Open a new [Port] to manage [Instrument]s. + * + * @return A new [Port] instance to install [Instrument]s to. + */ + fun openPort(): Port<M> + + /** * Step through one cycle in the simulation. This method will process all events in a single tick, update the * internal clock and then return the control to the user. */ diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt index 532a033a..4e3aa16c 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -25,8 +25,15 @@ package com.atlarge.opendc.omega import com.atlarge.opendc.simulator.* +import com.atlarge.opendc.simulator.instrumentation.Instrument +import com.atlarge.opendc.simulator.instrumentation.InstrumentScope +import com.atlarge.opendc.simulator.instrumentation.Port import com.atlarge.opendc.simulator.kernel.Simulation +import kotlinx.coroutines.experimental.channels.Channel +import kotlinx.coroutines.experimental.channels.ReceiveChannel +import kotlinx.coroutines.experimental.channels.SendChannel import mu.KotlinLogging +import java.lang.ref.WeakReference import java.util.* import kotlin.coroutines.experimental.* @@ -127,6 +134,33 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot schedule(prepare(message, destination, sender, delay)) // Simulation implementation + override fun openPort(): Port<M> = object : Port<M> { + val channels: MutableSet<WeakReference<Channel<*>>> = mutableSetOf() + + override fun <T> install(capacity: Int, instrument: Instrument<T, M>): ReceiveChannel<T> { + val channel = Channel<T>(capacity) + val process = object : Process<Unit, M> { + override val initialState = Unit + override suspend fun Context<Unit, M>.run() { + val builder = object : InstrumentScope<T, M>, SendChannel<T> by channel, Context<Unit, M> by this {} + try { + instrument(builder) + channel.close() + } catch (cause: Throwable) { + channel.close(cause) + } + } + } + channels.add(WeakReference(channel)) + register(process) + return channel + } + + override fun close(cause: Throwable?): Boolean = channels + .map { it.get()?.close(cause) ?: false } + .any() + } + override fun step() { while (true) { val envelope = queue.peek() ?: return diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt index c47f9a26..74d6e2de 100644 --- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt +++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt @@ -27,8 +27,12 @@ package com.atlarge.opendc.omega import com.atlarge.opendc.simulator.Bootstrap import com.atlarge.opendc.simulator.Context import com.atlarge.opendc.simulator.Process +import com.atlarge.opendc.simulator.instrumentation.Instrument +import com.atlarge.opendc.simulator.kernel.Simulation +import kotlinx.coroutines.experimental.Unconfined +import kotlinx.coroutines.experimental.async +import kotlinx.coroutines.experimental.channels.consumeEach import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test /** @@ -123,6 +127,7 @@ internal class SmokeTest { hold(10) } } + /** * Test if the kernel allows access to the simulation model object. */ @@ -135,9 +140,29 @@ internal class SmokeTest { value } - val simulation = OmegaKernel.create(bootstrap) - simulation.run(5) + val kernel = OmegaKernel.create(bootstrap) + kernel.run(5) + } + + + @Test + fun `instrumentation works`() { + val instrument: Instrument<Int, Unit> = { + var value = 0 - assertTrue(simulation.run { process.state }) + for (i in 1..10) { + send(value) + value += 10 + hold(20) + } + } + + val simulation: Simulation<Unit> = OmegaKernel.create(Bootstrap.create { Unit }) + val stream = simulation.openPort().install(instrument) + + val res = async(Unconfined) { + stream.consumeEach { println(it) } + } + simulation.run(100) } } diff --git a/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt b/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt index 13f322bc..ab701259 100644 --- a/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt +++ b/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt @@ -45,7 +45,7 @@ fun main(args: Array<String>) { properties["javax.persistence.jdbc.password"] = env["PERSISTENCE_PASSWORD"] ?: "" val factory = Persistence.createEntityManagerFactory("opendc-simulator", properties) - val timeout = 10000L + val timeout = 30000L val threads = 4 val executorService = Executors.newFixedThreadPool(threads) val experiments = JpaExperimentManager(factory) diff --git a/opendc-model-odc/setup/src/main/resources/META-INF/persistence.xml b/opendc-model-odc/setup/src/main/resources/META-INF/persistence.xml index 4c4e6ac7..a091bdbd 100644 --- a/opendc-model-odc/setup/src/main/resources/META-INF/persistence.xml +++ b/opendc-model-odc/setup/src/main/resources/META-INF/persistence.xml @@ -35,7 +35,7 @@ <property name="hibernate.dialect" value="org.hibernate.dialect.MySQLDialect"/> <property name="hibernate.show_sql" value="false"/> <property name="hibernate.hbm2ddl.auto" value="validate"/> - <property name="hibernate.jdbc.batch_size" value="50"/> + <property name="hibernate.jdbc.batch_size" value="100"/> </properties> </persistence-unit> </persistence> diff --git a/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt b/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt new file mode 100644 index 00000000..439813e8 --- /dev/null +++ b/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt @@ -0,0 +1,124 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.simulator.instrumentation + +import kotlinx.coroutines.experimental.Unconfined +import kotlinx.coroutines.experimental.channels.ReceiveChannel +import kotlinx.coroutines.experimental.channels.consume +import kotlinx.coroutines.experimental.channels.produce +import kotlin.coroutines.experimental.CoroutineContext + +/** + * Interpolate [n] amount of elements between every two occurrences of elements passing through the channel. + * + * The operation is _intermediate_ and _stateless_. + * This function [consumes][consume] all elements of the original [ReceiveChannel]. + * + * @param context The context of the coroutine. + * @param n The amount of elements to interpolate between the actual elements in the channel. + * @param interpolator A function to interpolate between the two element occurrences. + */ +fun <E> ReceiveChannel<E>.interpolate(n: Int, context: CoroutineContext = Unconfined, + interpolator: (Double, E, E) -> E): ReceiveChannel<E> = + produce(context) { + consume { + val iterator = iterator() + + if (!iterator.hasNext()) + return@produce + + var a = iterator.next() + send(a) + + while (iterator.hasNext()) { + val b = iterator.next() + for (i in 1..n) { + send(interpolator(i.toDouble() / (n + 1), a, b)) + } + send(b) + a = b + } + } + } + +/** + * Perform a linear interpolation on the given double values. + * + * @param a The start value + * @param b The end value + * @param f The amount to interpolate which represents the position between the two values as a percentage in [0, 1]. + * @return The interpolated double result between the double values. + */ +fun lerp(a: Double, b: Double, f: Double): Double = a + f * (b - a) + +/** + * Perform a linear interpolation on the given float values. + * + * @param a The start value + * @param b The end value + * @param f The amount to interpolate which represents the position between the two values as a percentage in [0, 1]. + * @return The interpolated float result between the float values. + */ +fun lerp(a: Float, b: Float, f: Float): Float = a + f * (b - a) + +/** + * Perform a linear interpolation on the given integer values. + * + * @param a The start value + * @param b The end value + * @param f The amount to interpolate which represents the position between the two values as a percentage in [0, 1]. + * @return The interpolated integer result between the integer values. + */ +fun lerp(a: Int, b: Int, f: Float): Int = lerp(a.toFloat(), b.toFloat(), f).toInt() + +/** + * Perform a linear interpolation on the given integer values. + * + * @param a The start value + * @param b The end value + * @param f The amount to interpolate which represents the position between the two values as a percentage in [0, 1]. + * @return The interpolated integer result between the integer values. + */ +fun lerp(a: Int, b: Int, f: Double): Int = lerp(a.toDouble(), b.toDouble(), f).toInt() + +/** + * Perform a linear interpolation on the given long values. + * + * @param a The start value + * @param b The end value + * @param f The amount to interpolate which represents the position between the two values as a percentage in [0, 1]. + * @return The interpolated long result between the long values. + */ +fun lerp(a: Long, b: Long, f: Double): Long = lerp(a.toDouble(), b.toDouble(), f).toLong() + +/** + * Perform a linear interpolation on the given long values. + * + * @param a The start value + * @param b The end value + * @param f The amount to interpolate which represents the position between the two values as a percentage in [0, 1]. + * @return The interpolated long result between the long values. + */ +fun lerp(a: Long, b: Long, f: Float): Long = lerp(a.toFloat(), b.toFloat(), f).toLong() |
