diff options
9 files changed, 271 insertions, 7 deletions
diff --git a/opendc-core/build.gradle b/opendc-core/build.gradle index 223e73f9..210b8820 100644 --- a/opendc-core/build.gradle +++ b/opendc-core/build.gradle @@ -77,6 +77,7 @@ repositories { dependencies { compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version" + compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:0.22.2" testCompile "org.junit.jupiter:junit-jupiter-api:5.0.0-RC3" testRuntime "org.junit.jupiter:junit-jupiter-engine:5.0.0-RC3" diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt new file mode 100644 index 00000000..7633444e --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt @@ -0,0 +1,29 @@ +package com.atlarge.opendc.simulator.instrumentation + +import com.atlarge.opendc.simulator.Context +import com.atlarge.opendc.simulator.Entity +import kotlinx.coroutines.experimental.channels.ReceiveChannel +import kotlinx.coroutines.experimental.channels.SendChannel + + /** + * A kernel instrumentation device that allows the observation and measurement of properties of interest within some + * model. + * + * An instrument is a [Process] that emits measurements from within some model in the form of a typed stream. An + * instrument is attached to a simulation using the [Port.install] method, which returns a [ReceiveChannel] from + * which the measurements can be extracted out of the simulation. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +typealias Instrument<T, M> = suspend InstrumentScope<T, M>.() -> Unit + +/** + * This interface defines the scope in which an instrumentation device is built. + * + * An instrument is a [Process] without any observable state that is allowed to send messages to other [Entity] + * instances in the simulation. In addition, the instrument can emit measurements using the methods provided by the + * [SendChannel] interface. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +interface InstrumentScope<in T, M>: SendChannel<T>, Context<Unit, M> diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt new file mode 100644 index 00000000..880f9a15 --- /dev/null +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt @@ -0,0 +1,42 @@ +package com.atlarge.opendc.simulator.instrumentation + +import com.atlarge.opendc.simulator.kernel.Simulation +import kotlinx.coroutines.experimental.channels.Channel +import kotlinx.coroutines.experimental.channels.ReceiveChannel + +/** + * A port allows users to install instrumentation devices to a [Simulation]. + * + * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) + */ +interface Port<M> { + /** + * Install the given instrumentation device to produce a stream of measurements of type <code>T</code>. + * + * The [ReceiveChannel] returned by this channel is by default backed by an unlimited buffer + * (using [Channel.UNLIMITED]), which may induce unnecessary overhead. + * + * @param instrument The instrumentation device to install. + * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published. + */ + fun <T> install(instrument: Instrument<T, M>): ReceiveChannel<T> = install(Channel.UNLIMITED, instrument) + + /** + * Install the given instrumentation device to produce a stream of measurements of type code>T</code>. + * + * @param capacity The capacity of the buffer of the channel. + * @param instrument The instrumentation device to install. + * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published. + */ + fun <T> install(capacity: Int, instrument: Instrument<T, M>): ReceiveChannel<T> + + /** + * Close this port and stop the instruments from producing more measurements. + * This is an idempotent operation – repeated invocations of this function have no effect and return false. + * + * @param cause An optional cause that is thrown when trying to receive more elements from the installed + * instruments. + * @return `true` if the port was closed, `false` if it was already closed. + */ + fun close(cause: Throwable? = null): Boolean +} diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt index bb2ef818..0954868a 100644 --- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt +++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt @@ -26,6 +26,8 @@ package com.atlarge.opendc.simulator.kernel import com.atlarge.opendc.simulator.Entity import com.atlarge.opendc.simulator.Instant +import com.atlarge.opendc.simulator.instrumentation.Instrument +import com.atlarge.opendc.simulator.instrumentation.Port /** * A message based discrete event simulation over some model `M`. This interface provides direct control over the @@ -35,7 +37,7 @@ import com.atlarge.opendc.simulator.Instant * @param M The shape of the model over which the simulation runs. * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl) */ -interface Simulation<out M> { +interface Simulation<M> { /** * The model in which the simulation runs. */ @@ -52,6 +54,13 @@ interface Simulation<out M> { val <E : Entity<S, *>, S> E.state: S /** + * Open a new [Port] to manage [Instrument]s. + * + * @return A new [Port] instance to install [Instrument]s to. + */ + fun openPort(): Port<M> + + /** * Step through one cycle in the simulation. This method will process all events in a single tick, update the * internal clock and then return the control to the user. */ diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt index 532a033a..4e3aa16c 100644 --- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt +++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt @@ -25,8 +25,15 @@ package com.atlarge.opendc.omega import com.atlarge.opendc.simulator.* +import com.atlarge.opendc.simulator.instrumentation.Instrument +import com.atlarge.opendc.simulator.instrumentation.InstrumentScope +import com.atlarge.opendc.simulator.instrumentation.Port import com.atlarge.opendc.simulator.kernel.Simulation +import kotlinx.coroutines.experimental.channels.Channel +import kotlinx.coroutines.experimental.channels.ReceiveChannel +import kotlinx.coroutines.experimental.channels.SendChannel import mu.KotlinLogging +import java.lang.ref.WeakReference import java.util.* import kotlin.coroutines.experimental.* @@ -127,6 +134,33 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot schedule(prepare(message, destination, sender, delay)) // Simulation implementation + override fun openPort(): Port<M> = object : Port<M> { + val channels: MutableSet<WeakReference<Channel<*>>> = mutableSetOf() + + override fun <T> install(capacity: Int, instrument: Instrument<T, M>): ReceiveChannel<T> { + val channel = Channel<T>(capacity) + val process = object : Process<Unit, M> { + override val initialState = Unit + override suspend fun Context<Unit, M>.run() { + val builder = object : InstrumentScope<T, M>, SendChannel<T> by channel, Context<Unit, M> by this {} + try { + instrument(builder) + channel.close() + } catch (cause: Throwable) { + channel.close(cause) + } + } + } + channels.add(WeakReference(channel)) + register(process) + return channel + } + + override fun close(cause: Throwable?): Boolean = channels + .map { it.get()?.close(cause) ?: false } + .any() + } + override fun step() { while (true) { val envelope = queue.peek() ?: return diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt index c47f9a26..74d6e2de 100644 --- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt +++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt @@ -27,8 +27,12 @@ package com.atlarge.opendc.omega import com.atlarge.opendc.simulator.Bootstrap import com.atlarge.opendc.simulator.Context import com.atlarge.opendc.simulator.Process +import com.atlarge.opendc.simulator.instrumentation.Instrument +import com.atlarge.opendc.simulator.kernel.Simulation +import kotlinx.coroutines.experimental.Unconfined +import kotlinx.coroutines.experimental.async +import kotlinx.coroutines.experimental.channels.consumeEach import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test /** @@ -123,6 +127,7 @@ internal class SmokeTest { hold(10) } } + /** * Test if the kernel allows access to the simulation model object. */ @@ -135,9 +140,29 @@ internal class SmokeTest { value } - val simulation = OmegaKernel.create(bootstrap) - simulation.run(5) + val kernel = OmegaKernel.create(bootstrap) + kernel.run(5) + } + + + @Test + fun `instrumentation works`() { + val instrument: Instrument<Int, Unit> = { + var value = 0 - assertTrue(simulation.run { process.state }) + for (i in 1..10) { + send(value) + value += 10 + hold(20) + } + } + + val simulation: Simulation<Unit> = OmegaKernel.create(Bootstrap.create { Unit }) + val stream = simulation.openPort().install(instrument) + + val res = async(Unconfined) { + stream.consumeEach { println(it) } + } + simulation.run(100) } } diff --git a/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt b/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt index 13f322bc..ab701259 100644 --- a/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt +++ b/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt @@ -45,7 +45,7 @@ fun main(args: Array<String>) { properties["javax.persistence.jdbc.password"] = env["PERSISTENCE_PASSWORD"] ?: "" val factory = Persistence.createEntityManagerFactory("opendc-simulator", properties) - val timeout = 10000L + val timeout = 30000L val threads = 4 val executorService = Executors.newFixedThreadPool(threads) val experiments = JpaExperimentManager(factory) diff --git a/opendc-model-odc/setup/src/main/resources/META-INF/persistence.xml b/opendc-model-odc/setup/src/main/resources/META-INF/persistence.xml index 4c4e6ac7..a091bdbd 100644 --- a/opendc-model-odc/setup/src/main/resources/META-INF/persistence.xml +++ b/opendc-model-odc/setup/src/main/resources/META-INF/persistence.xml @@ -35,7 +35,7 @@ <property name="hibernate.dialect" value="org.hibernate.dialect.MySQLDialect"/> <property name="hibernate.show_sql" value="false"/> <property name="hibernate.hbm2ddl.auto" value="validate"/> - <property name="hibernate.jdbc.batch_size" value="50"/> + <property name="hibernate.jdbc.batch_size" value="100"/> </properties> </persistence-unit> </persistence> diff --git a/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt b/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt new file mode 100644 index 00000000..439813e8 --- /dev/null +++ b/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt @@ -0,0 +1,124 @@ +/* + * MIT License + * + * Copyright (c) 2017 atlarge-research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.atlarge.opendc.simulator.instrumentation + +import kotlinx.coroutines.experimental.Unconfined +import kotlinx.coroutines.experimental.channels.ReceiveChannel +import kotlinx.coroutines.experimental.channels.consume +import kotlinx.coroutines.experimental.channels.produce +import kotlin.coroutines.experimental.CoroutineContext + +/** + * Interpolate [n] amount of elements between every two occurrences of elements passing through the channel. + * + * The operation is _intermediate_ and _stateless_. + * This function [consumes][consume] all elements of the original [ReceiveChannel]. + * + * @param context The context of the coroutine. + * @param n The amount of elements to interpolate between the actual elements in the channel. + * @param interpolator A function to interpolate between the two element occurrences. + */ +fun <E> ReceiveChannel<E>.interpolate(n: Int, context: CoroutineContext = Unconfined, + interpolator: (Double, E, E) -> E): ReceiveChannel<E> = + produce(context) { + consume { + val iterator = iterator() + + if (!iterator.hasNext()) + return@produce + + var a = iterator.next() + send(a) + + while (iterator.hasNext()) { + val b = iterator.next() + for (i in 1..n) { + send(interpolator(i.toDouble() / (n + 1), a, b)) + } + send(b) + a = b + } + } + } + +/** + * Perform a linear interpolation on the given double values. + * + * @param a The start value + * @param b The end value + * @param f The amount to interpolate which represents the position between the two values as a percentage in [0, 1]. + * @return The interpolated double result between the double values. + */ +fun lerp(a: Double, b: Double, f: Double): Double = a + f * (b - a) + +/** + * Perform a linear interpolation on the given float values. + * + * @param a The start value + * @param b The end value + * @param f The amount to interpolate which represents the position between the two values as a percentage in [0, 1]. + * @return The interpolated float result between the float values. + */ +fun lerp(a: Float, b: Float, f: Float): Float = a + f * (b - a) + +/** + * Perform a linear interpolation on the given integer values. + * + * @param a The start value + * @param b The end value + * @param f The amount to interpolate which represents the position between the two values as a percentage in [0, 1]. + * @return The interpolated integer result between the integer values. + */ +fun lerp(a: Int, b: Int, f: Float): Int = lerp(a.toFloat(), b.toFloat(), f).toInt() + +/** + * Perform a linear interpolation on the given integer values. + * + * @param a The start value + * @param b The end value + * @param f The amount to interpolate which represents the position between the two values as a percentage in [0, 1]. + * @return The interpolated integer result between the integer values. + */ +fun lerp(a: Int, b: Int, f: Double): Int = lerp(a.toDouble(), b.toDouble(), f).toInt() + +/** + * Perform a linear interpolation on the given long values. + * + * @param a The start value + * @param b The end value + * @param f The amount to interpolate which represents the position between the two values as a percentage in [0, 1]. + * @return The interpolated long result between the long values. + */ +fun lerp(a: Long, b: Long, f: Double): Long = lerp(a.toDouble(), b.toDouble(), f).toLong() + +/** + * Perform a linear interpolation on the given long values. + * + * @param a The start value + * @param b The end value + * @param f The amount to interpolate which represents the position between the two values as a percentage in [0, 1]. + * @return The interpolated long result between the long values. + */ +fun lerp(a: Long, b: Long, f: Float): Long = lerp(a.toFloat(), b.toFloat(), f).toLong() |
