summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2018-02-14 12:12:57 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2018-02-23 12:20:30 +0100
commit699338a4e7b226ae7acf0f4aef1b0b28d90eb5b3 (patch)
treefbebd90aebd1863f6bcbe4ccbb790c6c7559c91e
parenta64ae13a6c5aef435e048b13bb3d7bad449f783b (diff)
feat(#12): Implement Instrumentation API in Omega kernel
These changes implement the Instrumentation API described in issue #11 into the Omega simulation kernel.
-rw-r--r--opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt2
-rw-r--r--opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt2
-rw-r--r--opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt23
-rw-r--r--opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt35
4 files changed, 56 insertions, 6 deletions
diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt
index 68f76569..75f7ed60 100644
--- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt
+++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Instrument.kt
@@ -28,4 +28,4 @@ typealias Instrument<T, M> = suspend InstrumentScope<T, M>.() -> Unit
*
* @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
*/
-interface InstrumentScope<in T, out M>: SendChannel<T>, Context<Unit, M>
+interface InstrumentScope<in T, M>: SendChannel<T>, Context<Unit, M>
diff --git a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt
index 74f1fb36..ce6dc6a4 100644
--- a/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt
+++ b/opendc-core/src/main/kotlin/com/atlarge/opendc/simulator/kernel/Simulation.kt
@@ -38,7 +38,7 @@ import kotlinx.coroutines.experimental.channels.ReceiveChannel
* @param M The shape of the model over which the simulation runs.
* @author Fabian Mastenbroek (f.s.mastenbroek@student.tudelft.nl)
*/
-interface Simulation<out M> {
+interface Simulation<M> {
/**
* The model in which the simulation runs.
*/
diff --git a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
index 532a033a..11217e8d 100644
--- a/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
+++ b/opendc-kernel-omega/src/main/kotlin/com/atlarge/opendc/omega/OmegaSimulation.kt
@@ -25,7 +25,12 @@
package com.atlarge.opendc.omega
import com.atlarge.opendc.simulator.*
+import com.atlarge.opendc.simulator.instrumentation.Instrument
+import com.atlarge.opendc.simulator.instrumentation.InstrumentScope
import com.atlarge.opendc.simulator.kernel.Simulation
+import kotlinx.coroutines.experimental.channels.Channel
+import kotlinx.coroutines.experimental.channels.ReceiveChannel
+import kotlinx.coroutines.experimental.channels.SendChannel
import mu.KotlinLogging
import java.util.*
import kotlin.coroutines.experimental.*
@@ -127,6 +132,24 @@ internal class OmegaSimulation<M>(bootstrap: Bootstrap<M>) : Simulation<M>, Boot
schedule(prepare(message, destination, sender, delay))
// Simulation implementation
+ override fun <T> install(capacity: Int, instrument: Instrument<T, M>): ReceiveChannel<T> {
+ val channel = Channel<T>(capacity)
+ val process = object : Process<Unit, M> {
+ override val initialState = Unit
+ override suspend fun Context<Unit, M>.run() {
+ val builder = object : InstrumentScope<T, M>, SendChannel<T> by channel, Context<Unit, M> by this {}
+ try {
+ instrument(builder)
+ channel.close()
+ } catch (cause: Throwable) {
+ channel.close(cause)
+ }
+ }
+ }
+ register(process)
+ return channel
+ }
+
override fun step() {
while (true) {
val envelope = queue.peek() ?: return
diff --git a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
index c47f9a26..42ca05ec 100644
--- a/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
+++ b/opendc-kernel-omega/src/test/kotlin/com/atlarge/opendc/omega/SmokeTest.kt
@@ -27,8 +27,13 @@ package com.atlarge.opendc.omega
import com.atlarge.opendc.simulator.Bootstrap
import com.atlarge.opendc.simulator.Context
import com.atlarge.opendc.simulator.Process
+import com.atlarge.opendc.simulator.instrumentation.Instrument
+import com.atlarge.opendc.simulator.kernel.Kernel
+import com.atlarge.opendc.simulator.kernel.Simulation
+import kotlinx.coroutines.experimental.Unconfined
+import kotlinx.coroutines.experimental.async
+import kotlinx.coroutines.experimental.channels.*
import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
/**
@@ -123,6 +128,7 @@ internal class SmokeTest {
hold(10)
}
}
+
/**
* Test if the kernel allows access to the simulation model object.
*/
@@ -135,9 +141,30 @@ internal class SmokeTest {
value
}
- val simulation = OmegaKernel.create(bootstrap)
- simulation.run(5)
+ val kernel = OmegaKernel.create(bootstrap)
+ kernel.run(5)
+ }
+
+
+ @Test
+ fun `instrumentation works`() {
+ val instrument: Instrument<Int, Unit> = {
+ var value = 0
+
+ for (i in 1..10) {
+ send(value)
+ value += 10
+ hold(20)
+ }
+ }
+
+ val simulation: Simulation<Unit> = OmegaKernel.create(Bootstrap.create { Unit })
+ val stream = simulation.install(instrument)
+
+ val res = async(Unconfined) {
+ stream.consumeEach { println(it) }
+ }
- assertTrue(simulation.run { process.state })
+ simulation.run(100)
}
}