summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--opendc-core/build.gradle1
-rw-r--r--opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt29
-rw-r--r--opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt42
-rw-r--r--opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt11
-rw-r--r--opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt34
-rw-r--r--opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt33
-rw-r--r--opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt2
-rw-r--r--opendc-model-odc/setup/src/main/resources/META-INF/persistence.xml2
-rw-r--r--opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt124
9 files changed, 271 insertions, 7 deletions
diff --git a/opendc-core/build.gradle b/opendc-core/build.gradle
index 223e73f9..210b8820 100644
--- a/opendc-core/build.gradle
+++ b/opendc-core/build.gradle
@@ -77,6 +77,7 @@ repositories {
dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
+ compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:0.22.2"
testCompile "org.junit.jupiter:junit-jupiter-api:5.0.0-RC3"
testRuntime "org.junit.jupiter:junit-jupiter-engine:5.0.0-RC3"
diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt
new file mode 100644
index 00000000..7633444e
--- /dev/null
+++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt
@@ -0,0 +1,29 @@
+package com.atlarge.opendc.simulator.instrumentation
+
+import com.atlarge.opendc.simulator.Context
+import com.atlarge.opendc.simulator.Entity
+import kotlinx.coroutines.experimental.channels.ReceiveChannel
+import kotlinx.coroutines.experimental.channels.SendChannel
+
+ /**
+ * A kernel instrumentation device that allows the observation and measurement of properties of interest within some
+ * model.
+ *
+ * An instrument is a [Process] that emits measurements from within some model in the form of a typed stream. An
+ * instrument is attached to a simulation using the [Port.install] method, which returns a [ReceiveChannel] from
+ * which the measurements can be extracted out of the simulation.
+ *
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+typealias Instrument<T, M> = suspend InstrumentScope<T, M>.() -> Unit
+
+/**
+ * This interface defines the scope in which an instrumentation device is built.
+ *
+ * An instrument is a [Process] without any observable state that is allowed to send messages to other [Entity]
+ * instances in the simulation. In addition, the instrument can emit measurements using the methods provided by the
+ * [SendChannel] interface.
+ *
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+interface InstrumentScope<in T, M>: SendChannel<T>, Context<Unit, M>
diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt
new file mode 100644
index 00000000..880f9a15
--- /dev/null
+++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt
@@ -0,0 +1,42 @@
+package com.atlarge.opendc.simulator.instrumentation
+
+import com.atlarge.opendc.simulator.kernel.Simulation
+import kotlinx.coroutines.experimental.channels.Channel
+import kotlinx.coroutines.experimental.channels.ReceiveChannel
+
+/**
+ * A port allows users to install instrumentation devices to a [Simulation].
+ *
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+interface Port<M> {
+ /**
+ * Install the given instrumentation device to produce a stream of measurements of type <code>T</code>.
+ *
+ * The [ReceiveChannel] returned by this channel is by default backed by an unlimited buffer
+ * (using [Channel.UNLIMITED]), which may induce unnecessary overhead.
+ *
+ * @param instrument The instrumentation device to install.
+ * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published.
+ */
+ fun <T> install(instrument: Instrument<T, M>): ReceiveChannel<T> = install(Channel.UNLIMITED, instrument)
+
+ /**
+ * Install the given instrumentation device to produce a stream of measurements of type code>T</code>.
+ *
+ * @param capacity The capacity of the buffer of the channel.
+ * @param instrument The instrumentation device to install.
+ * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published.
+ */
+ fun <T> install(capacity: Int, instrument: Instrument<T, M>): ReceiveChannel<T>
+
+ /**
+ * Close this port and stop the instruments from producing more measurements.
+ * This is an idempotent operation – repeated invocations of this function have no effect and return false.
+ *
+ * @param cause An optional cause that is thrown when trying to receive more elements from the installed
+ * instruments.
+ * @return `true` if the port was closed, `false` if it was already closed.
+ */
+ fun close(cause: Throwable? = null): Boolean
+}
diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt
index bb2ef818..0954868a 100644
--- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt
+++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt
@@ -26,6 +26,8 @@ package com.atlarge.opendc.simulator.kernel
import com.atlarge.opendc.simulator.Entity
import com.atlarge.opendc.simulator.Instant
+import com.atlarge.opendc.simulator.instrumentation.Instrument
+import com.atlarge.opendc.simulator.instrumentation.Port
/**
* A message based discrete event simulation over some model `M`. This interface provides direct control over the
@@ -35,7 +37,7 @@ import com.atlarge.opendc.simulator.Instant
* @param M The shape of the model over which the simulation runs.
* @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
*/
-interface Simulation<out M> {
+interface Simulation<M> {
/**
* The model in which the simulation runs.
*/
@@ -52,6 +54,13 @@ interface Simulation<out M> {
val <E : Entity<S, *>, S> E.state: S
/**
+ * Open a new [Port] to manage [Instrument]s.
+ *
+ * @return A new [Port] instance to install [Instrument]s to.
+ */
+ fun openPort(): Port<M>
+
+ /**
* Step through one cycle in the simulation. This method will process all events in a single tick, update the
* internal clock and then return the control to the user.
*/
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
index 532a033a..4e3aa16c 100644
--- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
+++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
@@ -25,8 +25,15 @@
package com.atlarge.opendc.omega
import com.atlarge.opendc.simulator.*
+import com.atlarge.opendc.simulator.instrumentation.Instrument
+import com.atlarge.opendc.simulator.instrumentation.InstrumentScope
+import com.atlarge.opendc.simulator.instrumentation.Port
import com.atlarge.opendc.simulator.kernel.Simulation
+import kotlinx.coroutines.experimental.channels.Channel
+import kotlinx.coroutines.experimental.channels.ReceiveChannel
+import kotlinx.coroutines.experimental.channels.SendChannel
import mu.KotlinLogging
+import java.lang.ref.WeakReference
import java.util.*
import kotlin.coroutines.experimental.*
@@ -127,6 +134,33 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot
schedule(prepare(message, destination, sender, delay))
// Simulation implementation
+ override fun openPort(): Port<M> = object : Port<M> {
+ val channels: MutableSet<WeakReference<Channel<*>>> = mutableSetOf()
+
+ override fun <T> install(capacity: Int, instrument: Instrument<T, M>): ReceiveChannel<T> {
+ val channel = Channel<T>(capacity)
+ val process = object : Process<Unit, M> {
+ override val initialState = Unit
+ override suspend fun Context<Unit, M>.run() {
+ val builder = object : InstrumentScope<T, M>, SendChannel<T> by channel, Context<Unit, M> by this {}
+ try {
+ instrument(builder)
+ channel.close()
+ } catch (cause: Throwable) {
+ channel.close(cause)
+ }
+ }
+ }
+ channels.add(WeakReference(channel))
+ register(process)
+ return channel
+ }
+
+ override fun close(cause: Throwable?): Boolean = channels
+ .map { it.get()?.close(cause) ?: false }
+ .any()
+ }
+
override fun step() {
while (true) {
val envelope = queue.peek() ?: return
diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
index c47f9a26..74d6e2de 100644
--- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
+++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
@@ -27,8 +27,12 @@ package com.atlarge.opendc.omega
import com.atlarge.opendc.simulator.Bootstrap
import com.atlarge.opendc.simulator.Context
import com.atlarge.opendc.simulator.Process
+import com.atlarge.opendc.simulator.instrumentation.Instrument
+import com.atlarge.opendc.simulator.kernel.Simulation
+import kotlinx.coroutines.experimental.Unconfined
+import kotlinx.coroutines.experimental.async
+import kotlinx.coroutines.experimental.channels.consumeEach
import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
/**
@@ -123,6 +127,7 @@ internal class SmokeTest {
hold(10)
}
}
+
/**
* Test if the kernel allows access to the simulation model object.
*/
@@ -135,9 +140,29 @@ internal class SmokeTest {
value
}
- val simulation = OmegaKernel.create(bootstrap)
- simulation.run(5)
+ val kernel = OmegaKernel.create(bootstrap)
+ kernel.run(5)
+ }
+
+
+ @Test
+ fun `instrumentation works`() {
+ val instrument: Instrument<Int, Unit> = {
+ var value = 0
- assertTrue(simulation.run { process.state })
+ for (i in 1..10) {
+ send(value)
+ value += 10
+ hold(20)
+ }
+ }
+
+ val simulation: Simulation<Unit> = OmegaKernel.create(Bootstrap.create { Unit })
+ val stream = simulation.openPort().install(instrument)
+
+ val res = async(Unconfined) {
+ stream.consumeEach { println(it) }
+ }
+ simulation.run(100)
}
}
diff --git a/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt b/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt
index 13f322bc..ab701259 100644
--- a/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt
+++ b/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt
@@ -45,7 +45,7 @@ fun main(args: Array<String>) {
properties["javax.persistence.jdbc.password"] = env["PERSISTENCE_PASSWORD"] ?: ""
val factory = Persistence.createEntityManagerFactory("opendc-simulator", properties)
- val timeout = 10000L
+ val timeout = 30000L
val threads = 4
val executorService = Executors.newFixedThreadPool(threads)
val experiments = JpaExperimentManager(factory)
diff --git a/opendc-model-odc/setup/src/main/resources/META-INF/persistence.xml b/opendc-model-odc/setup/src/main/resources/META-INF/persistence.xml
index 4c4e6ac7..a091bdbd 100644
--- a/opendc-model-odc/setup/src/main/resources/META-INF/persistence.xml
+++ b/opendc-model-odc/setup/src/main/resources/META-INF/persistence.xml
@@ -35,7 +35,7 @@
<property name="hibernate.dialect" value="org.hibernate.dialect.MySQLDialect"/>
<property name="hibernate.show_sql" value="false"/>
<property name="hibernate.hbm2ddl.auto" value="validate"/>
- <property name="hibernate.jdbc.batch_size" value="50"/>
+ <property name="hibernate.jdbc.batch_size" value="100"/>
</properties>
</persistence-unit>
</persistence>
diff --git a/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt b/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt
new file mode 100644
index 00000000..439813e8
--- /dev/null
+++ b/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt
@@ -0,0 +1,124 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2017 atlarge-research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package com.atlarge.opendc.simulator.instrumentation
+
+import kotlinx.coroutines.experimental.Unconfined
+import kotlinx.coroutines.experimental.channels.ReceiveChannel
+import kotlinx.coroutines.experimental.channels.consume
+import kotlinx.coroutines.experimental.channels.produce
+import kotlin.coroutines.experimental.CoroutineContext
+
+/**
+ * Interpolate [n] amount of elements between every two occurrences of elements passing through the channel.
+ *
+ * The operation is _intermediate_ and _stateless_.
+ * This function [consumes][consume] all elements of the original [ReceiveChannel].
+ *
+ * @param context The context of the coroutine.
+ * @param n The amount of elements to interpolate between the actual elements in the channel.
+ * @param interpolator A function to interpolate between the two element occurrences.
+ */
+fun <E> ReceiveChannel<E>.interpolate(n: Int, context: CoroutineContext = Unconfined,
+ interpolator: (Double, E, E) -> E): ReceiveChannel<E> =
+ produce(context) {
+ consume {
+ val iterator = iterator()
+
+ if (!iterator.hasNext())
+ return@produce
+
+ var a = iterator.next()
+ send(a)
+
+ while (iterator.hasNext()) {
+ val b = iterator.next()
+ for (i in 1..n) {
+ send(interpolator(i.toDouble() / (n + 1), a, b))
+ }
+ send(b)
+ a = b
+ }
+ }
+ }
+
+/**
+ * Perform a linear interpolation on the given double values.
+ *
+ * @param a The start value
+ * @param b The end value
+ * @param f The amount to interpolate which represents the position between the two values as a percentage in [0, 1].
+ * @return The interpolated double result between the double values.
+ */
+fun lerp(a: Double, b: Double, f: Double): Double = a + f * (b - a)
+
+/**
+ * Perform a linear interpolation on the given float values.
+ *
+ * @param a The start value
+ * @param b The end value
+ * @param f The amount to interpolate which represents the position between the two values as a percentage in [0, 1].
+ * @return The interpolated float result between the float values.
+ */
+fun lerp(a: Float, b: Float, f: Float): Float = a + f * (b - a)
+
+/**
+ * Perform a linear interpolation on the given integer values.
+ *
+ * @param a The start value
+ * @param b The end value
+ * @param f The amount to interpolate which represents the position between the two values as a percentage in [0, 1].
+ * @return The interpolated integer result between the integer values.
+ */
+fun lerp(a: Int, b: Int, f: Float): Int = lerp(a.toFloat(), b.toFloat(), f).toInt()
+
+/**
+ * Perform a linear interpolation on the given integer values.
+ *
+ * @param a The start value
+ * @param b The end value
+ * @param f The amount to interpolate which represents the position between the two values as a percentage in [0, 1].
+ * @return The interpolated integer result between the integer values.
+ */
+fun lerp(a: Int, b: Int, f: Double): Int = lerp(a.toDouble(), b.toDouble(), f).toInt()
+
+/**
+ * Perform a linear interpolation on the given long values.
+ *
+ * @param a The start value
+ * @param b The end value
+ * @param f The amount to interpolate which represents the position between the two values as a percentage in [0, 1].
+ * @return The interpolated long result between the long values.
+ */
+fun lerp(a: Long, b: Long, f: Double): Long = lerp(a.toDouble(), b.toDouble(), f).toLong()
+
+/**
+ * Perform a linear interpolation on the given long values.
+ *
+ * @param a The start value
+ * @param b The end value
+ * @param f The amount to interpolate which represents the position between the two values as a percentage in [0, 1].
+ * @return The interpolated long result between the long values.
+ */
+fun lerp(a: Long, b: Long, f: Float): Long = lerp(a.toFloat(), b.toFloat(), f).toLong()