diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-08 10:58:43 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-08 17:11:52 +0200 |
| commit | 54f83291aaff75ed875e507d8dbf9037d3e93710 (patch) | |
| tree | 71ea030f6433620439508bfd2a0c558a57a7e87a /opendc-simulator/opendc-simulator-flow/src | |
| parent | 774ed886ac8f84ae2974c1204534ee332d920864 (diff) | |
refactor(simulator): Simplify FlowSink implementation
This change simplifies the FlowSink implementation by not relying on the
AbstractFlowConsumer, but instead implementing the FlowConsumer
interface itself.
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow/src')
5 files changed, 133 insertions, 153 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt deleted file mode 100644 index 5f1057e8..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -import org.opendc.simulator.flow.internal.MutableFlowCounters - -/** - * Abstract implementation of the [FlowConsumer] which can be re-used by other implementations. - */ -public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initialCapacity: Double) : FlowConsumer { - /** - * A flag to indicate that the flow consumer is active. - */ - public override val isActive: Boolean - get() = ctx != null - - /** - * The capacity of the consumer. - */ - public override var capacity: Double = initialCapacity - set(value) { - field = value - ctx?.capacity = value - } - - /** - * The current processing rate of the consumer. - */ - public override val rate: Double - get() = ctx?.rate ?: 0.0 - - /** - * The flow processing rate demand at this instant. - */ - public override val demand: Double - get() = ctx?.demand ?: 0.0 - - /** - * The [FlowConsumerContext] that is currently running. - */ - protected var ctx: FlowConsumerContext? = null - private set - - /** - * Construct the [FlowConsumerLogic] instance for a new source. - */ - protected abstract fun createLogic(): FlowConsumerLogic - - /** - * Start the specified [FlowConsumerContext]. - */ - protected open fun start(ctx: FlowConsumerContext) { - ctx.start() - } - - /** - * The previous demand for the consumer. - */ - private var _previousDemand = 0.0 - private var _previousCapacity = 0.0 - - /** - * Update the counters of the flow consumer. - */ - protected fun MutableFlowCounters.update(ctx: FlowConnection, delta: Long) { - val demand = _previousDemand - val capacity = _previousCapacity - - _previousDemand = ctx.demand - _previousCapacity = ctx.capacity - - if (delta <= 0) { - return - } - - val deltaS = delta / 1000.0 - val total = demand * deltaS - val work = capacity * deltaS - val actualWork = ctx.rate * deltaS - - increment(work, actualWork, (total - actualWork), 0.0) - } - - final override fun startConsumer(source: FlowSource) { - check(ctx == null) { "Consumer is in invalid state" } - val ctx = engine.newContext(source, createLogic()) - - ctx.capacity = capacity - this.ctx = ctx - - start(ctx) - } - - final override fun pull() { - ctx?.pull() - } - - final override fun cancel() { - val ctx = ctx - if (ctx != null) { - this.ctx = null - ctx.close() - } - } - - override fun toString(): String = "AbstractFlowConsumer[capacity=$capacity]" -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index 229fd96a..7230a966 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -23,6 +23,7 @@ package org.opendc.simulator.flow import mu.KotlinLogging +import org.opendc.simulator.flow.internal.D_MS_TO_S import org.opendc.simulator.flow.internal.MutableFlowCounters import kotlin.math.max @@ -241,7 +242,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled } val counters = _counters - val deltaS = delta / 1000.0 + val deltaS = delta * D_MS_TO_S val total = ctx.capacity * deltaS val work = _demand * deltaS val actualWork = ctx.rate * deltaS diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt index 170ab1c0..e9094443 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt @@ -22,6 +22,7 @@ package org.opendc.simulator.flow +import org.opendc.simulator.flow.internal.D_MS_TO_S import org.opendc.simulator.flow.internal.MutableFlowCounters /** @@ -35,7 +36,34 @@ public class FlowSink( private val engine: FlowEngine, initialCapacity: Double, private val parent: FlowConvergenceListener? = null -) : AbstractFlowConsumer(engine, initialCapacity) { +) : FlowConsumer { + /** + * A flag to indicate that the flow consumer is active. + */ + public override val isActive: Boolean + get() = _ctx != null + + /** + * The capacity of the consumer. + */ + public override var capacity: Double = initialCapacity + set(value) { + field = value + _ctx?.capacity = value + } + + /** + * The current processing rate of the consumer. + */ + public override val rate: Double + get() = _ctx?.rate ?: 0.0 + + /** + * The flow processing rate demand at this instant. + */ + public override val demand: Double + get() = _ctx?.demand ?: 0.0 + /** * The flow counters to track the flow metrics of the consumer. */ @@ -43,36 +71,85 @@ public class FlowSink( get() = _counters private val _counters = MutableFlowCounters() - override fun start(ctx: FlowConsumerContext) { + /** + * The current active [FlowConsumerLogic] of this sink. + */ + private var _ctx: FlowConsumerContext? = null + + override fun startConsumer(source: FlowSource) { + check(_ctx == null) { "Consumer is in invalid state" } + + val ctx = engine.newContext(source, Logic(parent, _counters)) + _ctx = ctx + + ctx.capacity = capacity if (parent != null) { ctx.shouldConsumerConverge = true } - super.start(ctx) + + ctx.start() } - override fun createLogic(): FlowConsumerLogic { - return object : FlowConsumerLogic { - private val parent = this@FlowSink.parent - - override fun onPush( - ctx: FlowConsumerContext, - now: Long, - delta: Long, - rate: Double - ) { - _counters.update(ctx, delta) - } + override fun pull() { + _ctx?.pull() + } - override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { - _counters.update(ctx, delta) - cancel() - } + override fun cancel() { + _ctx?.close() + } + + override fun toString(): String = "FlowSink[capacity=$capacity]" + + /** + * [FlowConsumerLogic] of a sink. + */ + private inner class Logic(private val parent: FlowConvergenceListener?, private val counters: MutableFlowCounters) : FlowConsumerLogic { + override fun onPush( + ctx: FlowConsumerContext, + now: Long, + delta: Long, + rate: Double + ) { + updateCounters(ctx, delta, rate, ctx.capacity) + } + + override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { + updateCounters(ctx, delta, 0.0, 0.0) + + _ctx = null + } + + override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { + parent?.onConverge(now, delta) + } + + /** + * The previous demand and capacity for the consumer. + */ + private val _previous = DoubleArray(2) + + /** + * Update the counters of the flow consumer. + */ + private fun updateCounters(ctx: FlowConnection, delta: Long, nextDemand: Double, nextCapacity: Double) { + val counters = counters + val previous = _previous + val demand = previous[0] + val capacity = previous[1] - override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { - parent?.onConverge(now, delta) + previous[0] = nextDemand + previous[1] = nextCapacity + + if (delta <= 0) { + return } + + val deltaS = delta * D_MS_TO_S + val total = demand * deltaS + val work = capacity * deltaS + val actualWork = ctx.rate * deltaS + + counters.increment(work, actualWork, (total - actualWork), 0.0) } } - - override fun toString(): String = "FlowSink[capacity=$capacity]" } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt new file mode 100644 index 00000000..450195ec --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.internal + +/** + * Constant for converting milliseconds into seconds. + */ +internal const val D_MS_TO_S = 1 / 1000.0 diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index eaa3f7c5..31fb5b73 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -25,6 +25,7 @@ package org.opendc.simulator.flow.mux import org.opendc.simulator.flow.* import org.opendc.simulator.flow.interference.InterferenceDomain import org.opendc.simulator.flow.interference.InterferenceKey +import org.opendc.simulator.flow.internal.D_MS_TO_S import org.opendc.simulator.flow.internal.MutableFlowCounters import kotlin.math.max import kotlin.math.min @@ -451,7 +452,7 @@ public class MaxMinFlowMultiplexer( return } - val deltaS = delta / 1000.0 + val deltaS = delta * D_MS_TO_S val demand = demand val rate = rate @@ -653,7 +654,7 @@ public class MaxMinFlowMultiplexer( val actualRate = actualRate - val deltaS = delta / 1000.0 + val deltaS = delta * D_MS_TO_S val demand = limit * deltaS val actual = actualRate * deltaS val remaining = (_capacity - actualRate) * deltaS |
