summaryrefslogtreecommitdiff
path: root/opendc-stdlib/src/main
diff options
context:
space:
mode:
authorFabian Mastenbroek <fabianishere@outlook.com>2018-07-11 14:33:12 +0200
committerGitHub <noreply@github.com>2018-07-11 14:33:12 +0200
commitbc814ab5b5a4becf3dbc5f796a165955c0305d70 (patch)
tree516d547bd03b6c95f9d640a2460d67bcf711895a /opendc-stdlib/src/main
parent07f245dcf4b01ade251d0f4bedc897d7145b04d1 (diff)
feat: Interpolate machine state and task progress (#21)
This pull request implements interpolation of task progress (represented as the `TaskState` and `MachineState` class) via the Interpolation helpers implemented in #20. The model assumes that tasks progress linearly between two measurements (since the time between measurements is usually small).
Diffstat (limited to 'opendc-stdlib/src/main')
-rw-r--r--opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Helpers.kt48
-rw-r--r--opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt12
2 files changed, 58 insertions, 2 deletions
diff --git a/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Helpers.kt b/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Helpers.kt
new file mode 100644
index 00000000..d6cf2e3a
--- /dev/null
+++ b/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Helpers.kt
@@ -0,0 +1,48 @@
+package com.atlarge.opendc.simulator.instrumentation
+
+import kotlinx.coroutines.experimental.Unconfined
+import kotlinx.coroutines.experimental.channels.ReceiveChannel
+import kotlinx.coroutines.experimental.channels.consumeEach
+import kotlinx.coroutines.experimental.channels.produce
+import kotlinx.coroutines.experimental.channels.toChannel
+import kotlinx.coroutines.experimental.launch
+import kotlin.coroutines.experimental.CoroutineContext
+
+/**
+ * Transform each element in the channel into a [ReceiveChannel] of output elements that is then flattened into the
+ * output stream by emitting elements from the channels as they become available.
+ *
+ * @param context The [CoroutineContext] to run the operation in.
+ * @param transform The function to transform the elements into channels.
+ * @return The flattened [ReceiveChannel] of merged elements.
+ */
+fun <E, R> ReceiveChannel<E>.flatMapMerge(context: CoroutineContext = Unconfined,
+ transform: suspend (E) -> ReceiveChannel<R>): ReceiveChannel<R> =
+ produce(context) {
+ val job = launch(Unconfined) {
+ consumeEach {
+ launch(coroutineContext) {
+ transform(it).toChannel(this@produce)
+ }
+ }
+ }
+ job.join()
+ }
+
+/**
+ * Merge this channel with the other channel into an output stream by emitting elements from the channels as they
+ * become available.
+ *
+ * @param context The [CoroutineContext] to run the operation in.
+ * @param other The other channel to merge with.
+ * @return The [ReceiveChannel] of merged elements.
+ */
+fun <E, E1: E, E2: E> ReceiveChannel<E1>.merge(context: CoroutineContext = Unconfined,
+ other: ReceiveChannel<E2>): ReceiveChannel<E> =
+ produce(context) {
+ val job = launch(Unconfined) {
+ launch(coroutineContext) { toChannel(this@produce) }
+ launch(coroutineContext) { other.toChannel(this@produce) }
+ }
+ job.join()
+ }
diff --git a/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt b/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt
index 439813e8..5a033ff3 100644
--- a/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt
+++ b/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt
@@ -41,8 +41,15 @@ import kotlin.coroutines.experimental.CoroutineContext
* @param interpolator A function to interpolate between the two element occurrences.
*/
fun <E> ReceiveChannel<E>.interpolate(n: Int, context: CoroutineContext = Unconfined,
- interpolator: (Double, E, E) -> E): ReceiveChannel<E> =
- produce(context) {
+ interpolator: (Double, E, E) -> E): ReceiveChannel<E> {
+ require(n >= 0) { "The amount to interpolate must be non-negative" }
+
+ // If we do not want to interpolate any elements, just return the original channel
+ if (n == 0) {
+ return this
+ }
+
+ return produce(context) {
consume {
val iterator = iterator()
@@ -62,6 +69,7 @@ fun <E> ReceiveChannel<E>.interpolate(n: Int, context: CoroutineContext = Unconf
}
}
}
+}
/**
* Perform a linear interpolation on the given double values.