summaryrefslogtreecommitdiff
path: root/opendc-kernel-omega/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <fabianishere@outlook.com>2018-04-22 22:27:13 +0200
committerGitHub <noreply@github.com>2018-04-22 22:27:13 +0200
commit07f245dcf4b01ade251d0f4bedc897d7145b04d1 (patch)
treea7b4c49df918e812998074f3ff71b6ba1868d645 /opendc-kernel-omega/src
parentf691a72b12a43fa15c1617966450c55206664797 (diff)
parent4ccf632ad4418114df0cd8460c7dd3a86c246f9d (diff)
feat(#12): Implement Instrumentation API
These changes contain the specification of the new Instrumentation API for the simulator, in addition to the implementation for the Omega kernel. As an example, the API allows users to measure data from processes in simulation and interpolate data points between the measurements. Closes #11, #12
Diffstat (limited to 'opendc-kernel-omega/src')
-rw-r--r--opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt34
-rw-r--r--opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt33
2 files changed, 63 insertions, 4 deletions
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
index 532a033a..4e3aa16c 100644
--- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
+++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
@@ -25,8 +25,15 @@
package com.atlarge.opendc.omega
import com.atlarge.opendc.simulator.*
+import com.atlarge.opendc.simulator.instrumentation.Instrument
+import com.atlarge.opendc.simulator.instrumentation.InstrumentScope
+import com.atlarge.opendc.simulator.instrumentation.Port
import com.atlarge.opendc.simulator.kernel.Simulation
+import kotlinx.coroutines.experimental.channels.Channel
+import kotlinx.coroutines.experimental.channels.ReceiveChannel
+import kotlinx.coroutines.experimental.channels.SendChannel
import mu.KotlinLogging
+import java.lang.ref.WeakReference
import java.util.*
import kotlin.coroutines.experimental.*
@@ -127,6 +134,33 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot
schedule(prepare(message, destination, sender, delay))
// Simulation implementation
+ override fun openPort(): Port<M> = object : Port<M> {
+ val channels: MutableSet<WeakReference<Channel<*>>> = mutableSetOf()
+
+ override fun <T> install(capacity: Int, instrument: Instrument<T, M>): ReceiveChannel<T> {
+ val channel = Channel<T>(capacity)
+ val process = object : Process<Unit, M> {
+ override val initialState = Unit
+ override suspend fun Context<Unit, M>.run() {
+ val builder = object : InstrumentScope<T, M>, SendChannel<T> by channel, Context<Unit, M> by this {}
+ try {
+ instrument(builder)
+ channel.close()
+ } catch (cause: Throwable) {
+ channel.close(cause)
+ }
+ }
+ }
+ channels.add(WeakReference(channel))
+ register(process)
+ return channel
+ }
+
+ override fun close(cause: Throwable?): Boolean = channels
+ .map { it.get()?.close(cause) ?: false }
+ .any()
+ }
+
override fun step() {
while (true) {
val envelope = queue.peek() ?: return
diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
index c47f9a26..74d6e2de 100644
--- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
+++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
@@ -27,8 +27,12 @@ package com.atlarge.opendc.omega
import com.atlarge.opendc.simulator.Bootstrap
import com.atlarge.opendc.simulator.Context
import com.atlarge.opendc.simulator.Process
+import com.atlarge.opendc.simulator.instrumentation.Instrument
+import com.atlarge.opendc.simulator.kernel.Simulation
+import kotlinx.coroutines.experimental.Unconfined
+import kotlinx.coroutines.experimental.async
+import kotlinx.coroutines.experimental.channels.consumeEach
import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
/**
@@ -123,6 +127,7 @@ internal class SmokeTest {
hold(10)
}
}
+
/**
* Test if the kernel allows access to the simulation model object.
*/
@@ -135,9 +140,29 @@ internal class SmokeTest {
value
}
- val simulation = OmegaKernel.create(bootstrap)
- simulation.run(5)
+ val kernel = OmegaKernel.create(bootstrap)
+ kernel.run(5)
+ }
+
+
+ @Test
+ fun `instrumentation works`() {
+ val instrument: Instrument<Int, Unit> = {
+ var value = 0
- assertTrue(simulation.run { process.state })
+ for (i in 1..10) {
+ send(value)
+ value += 10
+ hold(20)
+ }
+ }
+
+ val simulation: Simulation<Unit> = OmegaKernel.create(Bootstrap.create { Unit })
+ val stream = simulation.openPort().install(instrument)
+
+ val res = async(Unconfined) {
+ stream.consumeEach { println(it) }
+ }
+ simulation.run(100)
}
}