From bc814ab5b5a4becf3dbc5f796a165955c0305d70 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 11 Jul 2018 14:33:12 +0200 Subject: feat: Interpolate machine state and task progress (#21) This pull request implements interpolation of task progress (represented as the `TaskState` and `MachineState` class) via the Interpolation helpers implemented in #20. The model assumes that tasks progress linearly between two measurements (since the time between measurements is usually small). --- .../opendc/simulator/instrumentation/Helpers.kt | 48 ++++++++++++++++++++++ .../simulator/instrumentation/Interpolation.kt | 12 +++++- 2 files changed, 58 insertions(+), 2 deletions(-) create mode 100644 opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Helpers.kt (limited to 'opendc-stdlib/src/main') diff --git a/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Helpers.kt b/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Helpers.kt new file mode 100644 index 00000000..d6cf2e3a --- /dev/null +++ b/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Helpers.kt @@ -0,0 +1,48 @@ +package com.atlarge.opendc.simulator.instrumentation + +import kotlinx.coroutines.experimental.Unconfined +import kotlinx.coroutines.experimental.channels.ReceiveChannel +import kotlinx.coroutines.experimental.channels.consumeEach +import kotlinx.coroutines.experimental.channels.produce +import kotlinx.coroutines.experimental.channels.toChannel +import kotlinx.coroutines.experimental.launch +import kotlin.coroutines.experimental.CoroutineContext + +/** + * Transform each element in the channel into a [ReceiveChannel] of output elements that is then flattened into the + * output stream by emitting elements from the channels as they become available. + * + * @param context The [CoroutineContext] to run the operation in. + * @param transform The function to transform the elements into channels. + * @return The flattened [ReceiveChannel] of merged elements. + */ +fun ReceiveChannel.flatMapMerge(context: CoroutineContext = Unconfined, + transform: suspend (E) -> ReceiveChannel): ReceiveChannel = + produce(context) { + val job = launch(Unconfined) { + consumeEach { + launch(coroutineContext) { + transform(it).toChannel(this@produce) + } + } + } + job.join() + } + +/** + * Merge this channel with the other channel into an output stream by emitting elements from the channels as they + * become available. + * + * @param context The [CoroutineContext] to run the operation in. + * @param other The other channel to merge with. + * @return The [ReceiveChannel] of merged elements. + */ +fun ReceiveChannel.merge(context: CoroutineContext = Unconfined, + other: ReceiveChannel): ReceiveChannel = + produce(context) { + val job = launch(Unconfined) { + launch(coroutineContext) { toChannel(this@produce) } + launch(coroutineContext) { other.toChannel(this@produce) } + } + job.join() + } diff --git a/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt b/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt index 439813e8..5a033ff3 100644 --- a/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt +++ b/opendc-stdlib/src/main/kotlin/com/atlarge/opendc/simulator/instrumentation/Interpolation.kt @@ -41,8 +41,15 @@ import kotlin.coroutines.experimental.CoroutineContext * @param interpolator A function to interpolate between the two element occurrences. */ fun ReceiveChannel.interpolate(n: Int, context: CoroutineContext = Unconfined, - interpolator: (Double, E, E) -> E): ReceiveChannel = - produce(context) { + interpolator: (Double, E, E) -> E): ReceiveChannel { + require(n >= 0) { "The amount to interpolate must be non-negative" } + + // If we do not want to interpolate any elements, just return the original channel + if (n == 0) { + return this + } + + return produce(context) { consume { val iterator = iterator() @@ -62,6 +69,7 @@ fun ReceiveChannel.interpolate(n: Int, context: CoroutineContext = Unconf } } } +} /** * Perform a linear interpolation on the given double values. -- cgit v1.2.3