summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2018-02-27 10:17:46 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2018-04-22 22:08:11 +0200
commit2c980f4627309b55a160dbe7fc17156201d9dde4 (patch)
treee7fef0c75acc1753ac1ac55736cff926b3e16182
parent8bc180adfcbde4cd977474174b846b2fa9dfec5b (diff)
feat(#12): Allow closing of instrument streams
This change in Instrumentation API allows the user to close the data stream of an instrument by introducing a new concept: Port. A user can open a `Port` for a `Simulation` object and attach an arbitrary amount of instruments to this port. The data streams are closed by calling `Port#close()`.
-rw-r--r--opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt10
-rw-r--r--opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt44
-rw-r--r--opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt26
-rw-r--r--opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt37
-rw-r--r--opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt6
-rw-r--r--opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt2
-rw-r--r--opendc-model-odc/setup/src/main/resources/META-INF/persistence.xml2
7 files changed, 80 insertions, 47 deletions
diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt
index 75f7ed60..7633444e 100644
--- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt
+++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt
@@ -1,19 +1,17 @@
package com.atlarge.opendc.simulator.instrumentation
-import com.atlarge.opendc.simulator.kernel.Kernel
-import com.atlarge.opendc.simulator.Entity
import com.atlarge.opendc.simulator.Context
+import com.atlarge.opendc.simulator.Entity
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.SendChannel
-/**
+ /**
* A kernel instrumentation device that allows the observation and measurement of properties of interest within some
* model.
*
* An instrument is a [Process] that emits measurements from within some model in the form of a typed stream. An
- * instrument is attached to a simulation using the [Kernel.install] method, which returns a [ReceiveChannel] from which
- * the measurements can be extracted out of the simulation.
- *
+ * instrument is attached to a simulation using the [Port.install] method, which returns a [ReceiveChannel] from
+ * which the measurements can be extracted out of the simulation.
*
* @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
*/
diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt
new file mode 100644
index 00000000..caa11761
--- /dev/null
+++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Port.kt
@@ -0,0 +1,44 @@
+package com.atlarge.opendc.simulator.instrumentation
+
+import com.atlarge.opendc.simulator.kernel.Simulation
+import kotlinx.coroutines.experimental.channels.Channel
+import kotlinx.coroutines.experimental.channels.ReceiveChannel
+
+/**
+ * A port allows users to install instrumentation devices to a [Simulation].
+ *
+ * @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
+ */
+interface Port<M> {
+ /**
+ * Install the given instrumentation device to produce a stream of measurements of type <code>T</code>.
+ *
+ * The [ReceiveChannel] returned by this channel is by default unlimited, which means the channel buffers at most
+ * one measurement, so that the receiver always gets the most recently sent element.
+ * Back-to-send sent measurements are conflated – only the the most recently sent element is received, while
+ * previously sent elements are lost.
+ *
+ * @param instrument The instrumentation device to install.
+ * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published.
+ */
+ fun <T> install(instrument: Instrument<T, M>): ReceiveChannel<T> = install(Channel.CONFLATED, instrument)
+
+ /**
+ * Install the given instrumentation device to produce a stream of measurements of type code>T</code>.
+ *
+ * @param capacity The capacity of the buffer of the channel.
+ * @param instrument The instrumentation device to install.
+ * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published.
+ */
+ fun <T> install(capacity: Int, instrument: Instrument<T, M>): ReceiveChannel<T>
+
+ /**
+ * Close this port and stop the instruments from producing more measurements.
+ * This is an idempotent operation – repeated invocations of this function have no effect and return false.
+ *
+ * @param cause An optional cause that is thrown when trying to receive more elements from the installed
+ * instruments.
+ * @return `true` if the port was closed, `false` if it was already closed.
+ */
+ fun close(cause: Throwable? = null): Boolean
+}
diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt
index ce6dc6a4..0954868a 100644
--- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt
+++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt
@@ -27,8 +27,7 @@ package com.atlarge.opendc.simulator.kernel
import com.atlarge.opendc.simulator.Entity
import com.atlarge.opendc.simulator.Instant
import com.atlarge.opendc.simulator.instrumentation.Instrument
-import kotlinx.coroutines.experimental.channels.Channel
-import kotlinx.coroutines.experimental.channels.ReceiveChannel
+import com.atlarge.opendc.simulator.instrumentation.Port
/**
* A message based discrete event simulation over some model `M`. This interface provides direct control over the
@@ -55,28 +54,11 @@ interface Simulation<M> {
val <E : Entity<S, *>, S> E.state: S
/**
- * Install the given instrumentation device in this kernel to produce a stream of measurements of type
- * <code>T</code>.
+ * Open a new [Port] to manage [Instrument]s.
*
- * The [ReceiveChannel] returned by this channel is by default conflated, which means the channel buffers at most
- * one measurement, so that the receiver always gets the most recently sent element.
- * Back-to-send sent measurements are conflated – only the the most recently sent element is received, while
- * previously sent elements are lost.
- *
- * @param instrument The instrumentation device to install.
- * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published.
- */
- fun <T> install(instrument: Instrument<T, M>): ReceiveChannel<T> = install(Channel.CONFLATED, instrument)
-
- /**
- * Install the given instrumentation device in this kernel to produce a stream of measurements of type
- * <code>T</code>.
- *
- * @param capacity The capacity of the buffer of the channel.
- * @param instrument The instrumentation device to install.
- * @return A [ReceiveChannel] to which the of measurements produced by the instrument are published.
+ * @return A new [Port] instance to install [Instrument]s to.
*/
- fun <T> install(capacity: Int = Channel.CONFLATED, instrument: Instrument<T, M>): ReceiveChannel<T>
+ fun openPort(): Port<M>
/**
* Step through one cycle in the simulation. This method will process all events in a single tick, update the
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
index 11217e8d..4e3aa16c 100644
--- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
+++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
@@ -27,11 +27,13 @@ package com.atlarge.opendc.omega
import com.atlarge.opendc.simulator.*
import com.atlarge.opendc.simulator.instrumentation.Instrument
import com.atlarge.opendc.simulator.instrumentation.InstrumentScope
+import com.atlarge.opendc.simulator.instrumentation.Port
import com.atlarge.opendc.simulator.kernel.Simulation
import kotlinx.coroutines.experimental.channels.Channel
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.SendChannel
import mu.KotlinLogging
+import java.lang.ref.WeakReference
import java.util.*
import kotlin.coroutines.experimental.*
@@ -132,22 +134,31 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot
schedule(prepare(message, destination, sender, delay))
// Simulation implementation
- override fun <T> install(capacity: Int, instrument: Instrument<T, M>): ReceiveChannel<T> {
- val channel = Channel<T>(capacity)
- val process = object : Process<Unit, M> {
- override val initialState = Unit
- override suspend fun Context<Unit, M>.run() {
- val builder = object : InstrumentScope<T, M>, SendChannel<T> by channel, Context<Unit, M> by this {}
- try {
- instrument(builder)
- channel.close()
- } catch (cause: Throwable) {
- channel.close(cause)
+ override fun openPort(): Port<M> = object : Port<M> {
+ val channels: MutableSet<WeakReference<Channel<*>>> = mutableSetOf()
+
+ override fun <T> install(capacity: Int, instrument: Instrument<T, M>): ReceiveChannel<T> {
+ val channel = Channel<T>(capacity)
+ val process = object : Process<Unit, M> {
+ override val initialState = Unit
+ override suspend fun Context<Unit, M>.run() {
+ val builder = object : InstrumentScope<T, M>, SendChannel<T> by channel, Context<Unit, M> by this {}
+ try {
+ instrument(builder)
+ channel.close()
+ } catch (cause: Throwable) {
+ channel.close(cause)
+ }
}
}
+ channels.add(WeakReference(channel))
+ register(process)
+ return channel
}
- register(process)
- return channel
+
+ override fun close(cause: Throwable?): Boolean = channels
+ .map { it.get()?.close(cause) ?: false }
+ .any()
}
override fun step() {
diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
index 42ca05ec..74d6e2de 100644
--- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
+++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
@@ -28,11 +28,10 @@ import com.atlarge.opendc.simulator.Bootstrap
import com.atlarge.opendc.simulator.Context
import com.atlarge.opendc.simulator.Process
import com.atlarge.opendc.simulator.instrumentation.Instrument
-import com.atlarge.opendc.simulator.kernel.Kernel
import com.atlarge.opendc.simulator.kernel.Simulation
import kotlinx.coroutines.experimental.Unconfined
import kotlinx.coroutines.experimental.async
-import kotlinx.coroutines.experimental.channels.*
+import kotlinx.coroutines.experimental.channels.consumeEach
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
@@ -159,12 +158,11 @@ internal class SmokeTest {
}
val simulation: Simulation<Unit> = OmegaKernel.create(Bootstrap.create { Unit })
- val stream = simulation.install(instrument)
+ val stream = simulation.openPort().install(instrument)
val res = async(Unconfined) {
stream.consumeEach { println(it) }
}
-
simulation.run(100)
}
}
diff --git a/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt b/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt
index 13f322bc..ab701259 100644
--- a/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt
+++ b/opendc-model-odc/setup/src/main/kotlin/com/atlarge/opendc/model/odc/platform/JpaPlatformRunner.kt
@@ -45,7 +45,7 @@ fun main(args: Array<String>) {
properties["javax.persistence.jdbc.password"] = env["PERSISTENCE_PASSWORD"] ?: ""
val factory = Persistence.createEntityManagerFactory("opendc-simulator", properties)
- val timeout = 10000L
+ val timeout = 30000L
val threads = 4
val executorService = Executors.newFixedThreadPool(threads)
val experiments = JpaExperimentManager(factory)
diff --git a/opendc-model-odc/setup/src/main/resources/META-INF/persistence.xml b/opendc-model-odc/setup/src/main/resources/META-INF/persistence.xml
index 4c4e6ac7..a091bdbd 100644
--- a/opendc-model-odc/setup/src/main/resources/META-INF/persistence.xml
+++ b/opendc-model-odc/setup/src/main/resources/META-INF/persistence.xml
@@ -35,7 +35,7 @@
<property name="hibernate.dialect" value="org.hibernate.dialect.MySQLDialect"/>
<property name="hibernate.show_sql" value="false"/>
<property name="hibernate.hbm2ddl.auto" value="validate"/>
- <property name="hibernate.jdbc.batch_size" value="50"/>
+ <property name="hibernate.jdbc.batch_size" value="100"/>
</properties>
</persistence-unit>
</persistence>