From a0340a8752c4c4ed8413944b1dfb81b9481b6556 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 6 Oct 2021 14:29:31 +0200 Subject: perf(simulator): Skip fair-share algorithm if capacity remaining This change updates the MaxMinFlowMultiplexer implementation to skip the fair-share algorithm in case the total demand is lower than the available capacity. In this case, no re-division of capacity is necessary. --- .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 60 ++++++++++++++-------- 1 file changed, 39 insertions(+), 21 deletions(-) (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 97059e93..9131eb54 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -372,6 +372,8 @@ public class MaxMinFlowMultiplexer( val capacity = capacity var availableCapacity = capacity + var deadline = Long.MAX_VALUE + var demand = 0.0 // Pull in the work of the outputs val inputIterator = activeInputs.listIterator() @@ -382,32 +384,36 @@ public class MaxMinFlowMultiplexer( if (!input.isActive) { input.actualRate = 0.0 inputIterator.remove() + } else { + demand += input.limit + deadline = min(deadline, input.deadline) } } - var demand = 0.0 - var deadline = Long.MAX_VALUE + val rate = if (demand > capacity) { + // If the demand is higher than the capacity, we need use max-min fair sharing to distribute the + // constrained capacity across the inputs. - // Sort in-place the inputs based on their pushed flow. - // Profiling shows that it is faster than maintaining some kind of sorted set. - activeInputs.sort() + // Sort in-place the inputs based on their pushed flow. + // Profiling shows that it is faster than maintaining some kind of sorted set. + activeInputs.sort() - // Divide the available output capacity fairly over the inputs using max-min fair sharing - val size = activeInputs.size - for (i in activeInputs.indices) { - val input = activeInputs[i] - val availableShare = availableCapacity / (size - i) - val grantedRate = min(input.allowedRate, availableShare) + // Divide the available output capacity fairly over the inputs using max-min fair sharing + val size = activeInputs.size + for (i in activeInputs.indices) { + val input = activeInputs[i] + val availableShare = availableCapacity / (size - i) + val grantedRate = min(input.allowedRate, availableShare) - demand += input.limit - deadline = min(deadline, input.deadline) - availableCapacity -= grantedRate + availableCapacity -= grantedRate + input.actualRate = grantedRate + } - input.actualRate = grantedRate + capacity - availableCapacity + } else { + demand } - val rate = capacity - availableCapacity - this.demand = demand this.rate = rate @@ -467,6 +473,11 @@ public class MaxMinFlowMultiplexer( */ @JvmField var actualRate: Double = 0.0 + /** + * The processing rate that is allowed by the model constraints. + */ + @JvmField var allowedRate: Double = 0.0 + /** * The deadline of the input. */ @@ -474,10 +485,14 @@ public class MaxMinFlowMultiplexer( get() = ctx?.deadline ?: Long.MAX_VALUE /** - * The processing rate that is allowed by the model constraints. + * The capacity of the input. */ - val allowedRate: Double - get() = min(capacity, limit) + override var capacity: Double + get() = super.capacity + set(value) { + allowedRate = min(limit, value) + super.capacity = value + } /** * A flag to enable timers for the input. @@ -530,8 +545,10 @@ public class MaxMinFlowMultiplexer( ) { doUpdateCounters(delta) - actualRate = 0.0 + val allowed = min(rate, capacity) limit = rate + actualRate = allowed + allowedRate = allowed scheduler.trigger(now) } @@ -541,6 +558,7 @@ public class MaxMinFlowMultiplexer( limit = 0.0 actualRate = 0.0 + allowedRate = 0.0 scheduler.deregisterInput(this, now) -- cgit v1.2.3 From 774ed886ac8f84ae2974c1204534ee332d920864 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 7 Oct 2021 14:39:03 +0200 Subject: fix(simulator): Count interference for multiplexer inputs This change updates the SimAbstractHypervisor and MaxMinFlowMultiplexer to count interference of multiplexer inputs, instead of only counting them for the scheduler as a whole. --- .../opendc/simulator/flow/AbstractFlowConsumer.kt | 26 +--- .../org/opendc/simulator/flow/FlowForwarder.kt | 9 +- .../kotlin/org/opendc/simulator/flow/FlowSink.kt | 12 +- .../simulator/flow/internal/FlowCountersImpl.kt | 46 ------- .../simulator/flow/internal/MutableFlowCounters.kt | 56 ++++++++ .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 147 ++++++++++++++------- 6 files changed, 175 insertions(+), 121 deletions(-) delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt index b02426e3..5f1057e8 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt @@ -22,7 +22,7 @@ package org.opendc.simulator.flow -import org.opendc.simulator.flow.internal.FlowCountersImpl +import org.opendc.simulator.flow.internal.MutableFlowCounters /** * Abstract implementation of the [FlowConsumer] which can be re-used by other implementations. @@ -55,13 +55,6 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi public override val demand: Double get() = ctx?.demand ?: 0.0 - /** - * The flow counters to track the flow metrics of the consumer. - */ - public override val counters: FlowCounters - get() = _counters - private val _counters = FlowCountersImpl() - /** * The [FlowConsumerContext] that is currently running. */ @@ -89,7 +82,7 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi /** * Update the counters of the flow consumer. */ - protected fun updateCounters(ctx: FlowConnection, delta: Long) { + protected fun MutableFlowCounters.update(ctx: FlowConnection, delta: Long) { val demand = _previousDemand val capacity = _previousCapacity @@ -100,25 +93,12 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi return } - val counters = _counters val deltaS = delta / 1000.0 val total = demand * deltaS val work = capacity * deltaS val actualWork = ctx.rate * deltaS - counters.demand += work - counters.actual += actualWork - counters.remaining += (total - actualWork) - } - - /** - * Update the counters of the flow consumer. - */ - protected fun updateCounters(demand: Double, actual: Double, remaining: Double) { - val counters = _counters - counters.demand += demand - counters.actual += actual - counters.remaining += remaining + increment(work, actualWork, (total - actualWork), 0.0) } final override fun startConsumer(source: FlowSource) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index 7eaaf6c2..229fd96a 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.flow import mu.KotlinLogging -import org.opendc.simulator.flow.internal.FlowCountersImpl +import org.opendc.simulator.flow.internal.MutableFlowCounters import kotlin.math.max /** @@ -117,7 +117,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled override val counters: FlowCounters get() = _counters - private val _counters = FlowCountersImpl() + private val _counters = MutableFlowCounters() override fun startConsumer(source: FlowSource) { check(delegate == null) { "Forwarder already active" } @@ -245,8 +245,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled val total = ctx.capacity * deltaS val work = _demand * deltaS val actualWork = ctx.rate * deltaS - counters.demand += work - counters.actual += actualWork - counters.remaining += (total - actualWork) + + counters.increment(work, actualWork, (total - actualWork), 0.0) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt index b4eb6a38..170ab1c0 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.flow +import org.opendc.simulator.flow.internal.MutableFlowCounters + /** * A [FlowSink] represents a sink with a fixed capacity. * @@ -34,6 +36,12 @@ public class FlowSink( initialCapacity: Double, private val parent: FlowConvergenceListener? = null ) : AbstractFlowConsumer(engine, initialCapacity) { + /** + * The flow counters to track the flow metrics of the consumer. + */ + public override val counters: FlowCounters + get() = _counters + private val _counters = MutableFlowCounters() override fun start(ctx: FlowConsumerContext) { if (parent != null) { @@ -52,11 +60,11 @@ public class FlowSink( delta: Long, rate: Double ) { - updateCounters(ctx, delta) + _counters.update(ctx, delta) } override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { - updateCounters(ctx, delta) + _counters.update(ctx, delta) cancel() } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt deleted file mode 100644 index d2fa5228..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow.internal - -import org.opendc.simulator.flow.FlowCounters - -/** - * Mutable implementation of the [FlowCounters] interface. - */ -internal class FlowCountersImpl : FlowCounters { - override var demand: Double = 0.0 - override var actual: Double = 0.0 - override var remaining: Double = 0.0 - override var interference: Double = 0.0 - - override fun reset() { - demand = 0.0 - actual = 0.0 - remaining = 0.0 - interference = 0.0 - } - - override fun toString(): String { - return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining,interference=$interference]" - } -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt new file mode 100644 index 00000000..d990dc61 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.internal + +import org.opendc.simulator.flow.FlowCounters + +/** + * Mutable implementation of the [FlowCounters] interface. + */ +public class MutableFlowCounters : FlowCounters { + override val demand: Double + get() = _counters[0] + override val actual: Double + get() = _counters[1] + override val remaining: Double + get() = _counters[2] + override val interference: Double + get() = _counters[3] + private val _counters = DoubleArray(4) + + override fun reset() { + _counters.fill(0.0) + } + + public fun increment(demand: Double, actual: Double, remaining: Double, interference: Double) { + val counters = _counters + counters[0] += demand + counters[1] += actual + counters[2] += remaining + counters[3] += interference + } + + override fun toString(): String { + return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining,interference=$interference]" + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 9131eb54..eaa3f7c5 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -25,7 +25,7 @@ package org.opendc.simulator.flow.mux import org.opendc.simulator.flow.* import org.opendc.simulator.flow.interference.InterferenceDomain import org.opendc.simulator.flow.interference.InterferenceKey -import org.opendc.simulator.flow.internal.FlowCountersImpl +import org.opendc.simulator.flow.internal.MutableFlowCounters import kotlin.math.max import kotlin.math.min @@ -85,7 +85,7 @@ public class MaxMinFlowMultiplexer( private val scheduler = Scheduler(engine, parent) override fun newInput(key: InterferenceKey?): FlowConsumer { - val provider = Input(engine, scheduler, interferenceDomain, key) + val provider = Input(engine, scheduler, interferenceDomain, key, scheduler.capacity) _inputs.add(provider) return provider } @@ -135,11 +135,11 @@ public class MaxMinFlowMultiplexer( /** * Helper class containing the scheduler state. */ - private class Scheduler(private val engine: FlowEngine, private val parent: FlowConvergenceListener?) { + private class Scheduler(engine: FlowEngine, private val parent: FlowConvergenceListener?) { /** * The flow counters of this scheduler. */ - @JvmField val counters = FlowCountersImpl() + @JvmField val counters = MutableFlowCounters() /** * The flow rate of the multiplexer. @@ -183,6 +183,11 @@ public class MaxMinFlowMultiplexer( private var _lastConverge: Long = Long.MIN_VALUE private var _lastConvergeInput: Input? = null + /** + * The simulation clock. + */ + private val _clock = engine.clock + /** * Register the specified [input] to this scheduler. */ @@ -195,7 +200,7 @@ public class MaxMinFlowMultiplexer( input.shouldConsumerConverge = !hasActivationOutput input.enableTimers = !hasActivationOutput input.capacity = capacity - trigger(engine.clock.millis()) + trigger(_clock.millis()) } /** @@ -447,10 +452,15 @@ public class MaxMinFlowMultiplexer( } val deltaS = delta / 1000.0 - - counters.demand += demand * deltaS - counters.actual += rate * deltaS - counters.remaining += (previousCapacity - rate) * deltaS + val demand = demand + val rate = rate + + counters.increment( + demand = demand * deltaS, + actual = rate * deltaS, + remaining = (previousCapacity - rate) * deltaS, + interference = 0.0 + ) } } @@ -458,41 +468,48 @@ public class MaxMinFlowMultiplexer( * An internal [FlowConsumer] implementation for multiplexer inputs. */ private class Input( - engine: FlowEngine, + private val engine: FlowEngine, private val scheduler: Scheduler, private val interferenceDomain: InterferenceDomain?, - @JvmField val key: InterferenceKey? - ) : AbstractFlowConsumer(engine, scheduler.capacity), FlowConsumerLogic, Comparable { - /** - * The requested limit. - */ - @JvmField var limit: Double = 0.0 - + @JvmField val key: InterferenceKey?, + initialCapacity: Double, + ) : FlowConsumer, FlowConsumerLogic, Comparable { /** - * The actual processing speed. + * A flag to indicate that the consumer is active. */ - @JvmField var actualRate: Double = 0.0 + override val isActive: Boolean + get() = _ctx != null /** - * The processing rate that is allowed by the model constraints. + * The demand of the consumer. */ - @JvmField var allowedRate: Double = 0.0 + override val demand: Double + get() = limit /** - * The deadline of the input. + * The processing rate of the consumer. */ - val deadline: Long - get() = ctx?.deadline ?: Long.MAX_VALUE + override val rate: Double + get() = actualRate /** * The capacity of the input. */ override var capacity: Double - get() = super.capacity + get() = _capacity set(value) { allowedRate = min(limit, value) - super.capacity = value + _capacity = value + _ctx?.capacity = value } + private var _capacity = initialCapacity + + /** + * The flow counters to track the flow metrics of the consumer. + */ + override val counters: FlowCounters + get() = _counters + private val _counters = MutableFlowCounters() /** * A flag to enable timers for the input. @@ -500,7 +517,7 @@ public class MaxMinFlowMultiplexer( var enableTimers: Boolean = true set(value) { field = value - ctx?.enableTimers = value + _ctx?.enableTimers = value } /** @@ -509,9 +526,35 @@ public class MaxMinFlowMultiplexer( var shouldConsumerConverge: Boolean = true set(value) { field = value - ctx?.shouldConsumerConverge = value + _ctx?.shouldConsumerConverge = value } + /** + * The requested limit. + */ + @JvmField var limit: Double = 0.0 + + /** + * The actual processing speed. + */ + @JvmField var actualRate: Double = 0.0 + + /** + * The processing rate that is allowed by the model constraints. + */ + @JvmField var allowedRate: Double = 0.0 + + /** + * The deadline of the input. + */ + val deadline: Long + get() = _ctx?.deadline ?: Long.MAX_VALUE + + /** + * The [FlowConsumerContext] that is currently running. + */ + private var _ctx: FlowConsumerContext? = null + /** * A flag to indicate that the input is closed. */ @@ -527,13 +570,33 @@ public class MaxMinFlowMultiplexer( cancel() } - /* AbstractFlowConsumer */ - override fun createLogic(): FlowConsumerLogic = this + /** + * Pull the source if necessary. + */ + fun pullSync() { + _ctx?.pullSync() + } - override fun start(ctx: FlowConsumerContext) { + /* FlowConsumer */ + override fun startConsumer(source: FlowSource) { check(!_isClosed) { "Cannot re-use closed input" } + check(_ctx == null) { "Consumer is in invalid state" } + + val ctx = engine.newContext(source, this) + _ctx = ctx + + ctx.capacity = capacity scheduler.registerInput(this) - super.start(ctx) + + ctx.start() + } + + override fun pull() { + _ctx?.pull() + } + + override fun cancel() { + _ctx?.close() } /* FlowConsumerLogic */ @@ -562,8 +625,7 @@ public class MaxMinFlowMultiplexer( scheduler.deregisterInput(this, now) - // BUG: Cancel the connection so that `ctx` is set to `null` - cancel() + _ctx = null } override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { @@ -573,13 +635,6 @@ public class MaxMinFlowMultiplexer( /* Comparable */ override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate) - /** - * Pull the source if necessary. - */ - fun pullSync() { - ctx?.pullSync() - } - /** * Helper method to update the flow counters of the multiplexer. */ @@ -596,14 +651,16 @@ public class MaxMinFlowMultiplexer( 1.0 } + val actualRate = actualRate + val deltaS = delta / 1000.0 val demand = limit * deltaS val actual = actualRate * deltaS - val remaining = (capacity - actualRate) * deltaS - - updateCounters(demand, actual, remaining) + val remaining = (_capacity - actualRate) * deltaS + val interference = actual * max(0.0, 1 - perfScore) - scheduler.counters.interference += actual * max(0.0, 1 - perfScore) + _counters.increment(demand, actual, remaining, interference) + scheduler.counters.increment(0.0, 0.0, 0.0, interference) } } -- cgit v1.2.3 From 54f83291aaff75ed875e507d8dbf9037d3e93710 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 8 Oct 2021 10:58:43 +0200 Subject: refactor(simulator): Simplify FlowSink implementation This change simplifies the FlowSink implementation by not relying on the AbstractFlowConsumer, but instead implementing the FlowConsumer interface itself. --- .../opendc/simulator/flow/AbstractFlowConsumer.kt | 127 --------------------- .../org/opendc/simulator/flow/FlowForwarder.kt | 3 +- .../kotlin/org/opendc/simulator/flow/FlowSink.kt | 123 ++++++++++++++++---- .../opendc/simulator/flow/internal/Constants.kt | 28 +++++ .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 5 +- 5 files changed, 133 insertions(+), 153 deletions(-) delete mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt deleted file mode 100644 index 5f1057e8..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright (c) 2021 AtLarge Research - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package org.opendc.simulator.flow - -import org.opendc.simulator.flow.internal.MutableFlowCounters - -/** - * Abstract implementation of the [FlowConsumer] which can be re-used by other implementations. - */ -public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initialCapacity: Double) : FlowConsumer { - /** - * A flag to indicate that the flow consumer is active. - */ - public override val isActive: Boolean - get() = ctx != null - - /** - * The capacity of the consumer. - */ - public override var capacity: Double = initialCapacity - set(value) { - field = value - ctx?.capacity = value - } - - /** - * The current processing rate of the consumer. - */ - public override val rate: Double - get() = ctx?.rate ?: 0.0 - - /** - * The flow processing rate demand at this instant. - */ - public override val demand: Double - get() = ctx?.demand ?: 0.0 - - /** - * The [FlowConsumerContext] that is currently running. - */ - protected var ctx: FlowConsumerContext? = null - private set - - /** - * Construct the [FlowConsumerLogic] instance for a new source. - */ - protected abstract fun createLogic(): FlowConsumerLogic - - /** - * Start the specified [FlowConsumerContext]. - */ - protected open fun start(ctx: FlowConsumerContext) { - ctx.start() - } - - /** - * The previous demand for the consumer. - */ - private var _previousDemand = 0.0 - private var _previousCapacity = 0.0 - - /** - * Update the counters of the flow consumer. - */ - protected fun MutableFlowCounters.update(ctx: FlowConnection, delta: Long) { - val demand = _previousDemand - val capacity = _previousCapacity - - _previousDemand = ctx.demand - _previousCapacity = ctx.capacity - - if (delta <= 0) { - return - } - - val deltaS = delta / 1000.0 - val total = demand * deltaS - val work = capacity * deltaS - val actualWork = ctx.rate * deltaS - - increment(work, actualWork, (total - actualWork), 0.0) - } - - final override fun startConsumer(source: FlowSource) { - check(ctx == null) { "Consumer is in invalid state" } - val ctx = engine.newContext(source, createLogic()) - - ctx.capacity = capacity - this.ctx = ctx - - start(ctx) - } - - final override fun pull() { - ctx?.pull() - } - - final override fun cancel() { - val ctx = ctx - if (ctx != null) { - this.ctx = null - ctx.close() - } - } - - override fun toString(): String = "AbstractFlowConsumer[capacity=$capacity]" -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index 229fd96a..7230a966 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -23,6 +23,7 @@ package org.opendc.simulator.flow import mu.KotlinLogging +import org.opendc.simulator.flow.internal.D_MS_TO_S import org.opendc.simulator.flow.internal.MutableFlowCounters import kotlin.math.max @@ -241,7 +242,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled } val counters = _counters - val deltaS = delta / 1000.0 + val deltaS = delta * D_MS_TO_S val total = ctx.capacity * deltaS val work = _demand * deltaS val actualWork = ctx.rate * deltaS diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt index 170ab1c0..e9094443 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt @@ -22,6 +22,7 @@ package org.opendc.simulator.flow +import org.opendc.simulator.flow.internal.D_MS_TO_S import org.opendc.simulator.flow.internal.MutableFlowCounters /** @@ -35,7 +36,34 @@ public class FlowSink( private val engine: FlowEngine, initialCapacity: Double, private val parent: FlowConvergenceListener? = null -) : AbstractFlowConsumer(engine, initialCapacity) { +) : FlowConsumer { + /** + * A flag to indicate that the flow consumer is active. + */ + public override val isActive: Boolean + get() = _ctx != null + + /** + * The capacity of the consumer. + */ + public override var capacity: Double = initialCapacity + set(value) { + field = value + _ctx?.capacity = value + } + + /** + * The current processing rate of the consumer. + */ + public override val rate: Double + get() = _ctx?.rate ?: 0.0 + + /** + * The flow processing rate demand at this instant. + */ + public override val demand: Double + get() = _ctx?.demand ?: 0.0 + /** * The flow counters to track the flow metrics of the consumer. */ @@ -43,36 +71,85 @@ public class FlowSink( get() = _counters private val _counters = MutableFlowCounters() - override fun start(ctx: FlowConsumerContext) { + /** + * The current active [FlowConsumerLogic] of this sink. + */ + private var _ctx: FlowConsumerContext? = null + + override fun startConsumer(source: FlowSource) { + check(_ctx == null) { "Consumer is in invalid state" } + + val ctx = engine.newContext(source, Logic(parent, _counters)) + _ctx = ctx + + ctx.capacity = capacity if (parent != null) { ctx.shouldConsumerConverge = true } - super.start(ctx) + + ctx.start() } - override fun createLogic(): FlowConsumerLogic { - return object : FlowConsumerLogic { - private val parent = this@FlowSink.parent - - override fun onPush( - ctx: FlowConsumerContext, - now: Long, - delta: Long, - rate: Double - ) { - _counters.update(ctx, delta) - } + override fun pull() { + _ctx?.pull() + } - override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { - _counters.update(ctx, delta) - cancel() - } + override fun cancel() { + _ctx?.close() + } + + override fun toString(): String = "FlowSink[capacity=$capacity]" + + /** + * [FlowConsumerLogic] of a sink. + */ + private inner class Logic(private val parent: FlowConvergenceListener?, private val counters: MutableFlowCounters) : FlowConsumerLogic { + override fun onPush( + ctx: FlowConsumerContext, + now: Long, + delta: Long, + rate: Double + ) { + updateCounters(ctx, delta, rate, ctx.capacity) + } + + override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { + updateCounters(ctx, delta, 0.0, 0.0) + + _ctx = null + } + + override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { + parent?.onConverge(now, delta) + } + + /** + * The previous demand and capacity for the consumer. + */ + private val _previous = DoubleArray(2) + + /** + * Update the counters of the flow consumer. + */ + private fun updateCounters(ctx: FlowConnection, delta: Long, nextDemand: Double, nextCapacity: Double) { + val counters = counters + val previous = _previous + val demand = previous[0] + val capacity = previous[1] - override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { - parent?.onConverge(now, delta) + previous[0] = nextDemand + previous[1] = nextCapacity + + if (delta <= 0) { + return } + + val deltaS = delta * D_MS_TO_S + val total = demand * deltaS + val work = capacity * deltaS + val actualWork = ctx.rate * deltaS + + counters.increment(work, actualWork, (total - actualWork), 0.0) } } - - override fun toString(): String = "FlowSink[capacity=$capacity]" } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt new file mode 100644 index 00000000..450195ec --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/Constants.kt @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.internal + +/** + * Constant for converting milliseconds into seconds. + */ +internal const val D_MS_TO_S = 1 / 1000.0 diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index eaa3f7c5..31fb5b73 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -25,6 +25,7 @@ package org.opendc.simulator.flow.mux import org.opendc.simulator.flow.* import org.opendc.simulator.flow.interference.InterferenceDomain import org.opendc.simulator.flow.interference.InterferenceKey +import org.opendc.simulator.flow.internal.D_MS_TO_S import org.opendc.simulator.flow.internal.MutableFlowCounters import kotlin.math.max import kotlin.math.min @@ -451,7 +452,7 @@ public class MaxMinFlowMultiplexer( return } - val deltaS = delta / 1000.0 + val deltaS = delta * D_MS_TO_S val demand = demand val rate = rate @@ -653,7 +654,7 @@ public class MaxMinFlowMultiplexer( val actualRate = actualRate - val deltaS = delta / 1000.0 + val deltaS = delta * D_MS_TO_S val demand = limit * deltaS val actual = actualRate * deltaS val remaining = (_capacity - actualRate) * deltaS -- cgit v1.2.3 From 043ad5b2713164c76b09ba8cd07af9f0ca1f35e4 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 8 Oct 2021 11:16:49 +0200 Subject: perf(simulator): Do not update outputs if rate is unchanged --- .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 23 ++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 31fb5b73..28743276 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -421,16 +421,19 @@ public class MaxMinFlowMultiplexer( } this.demand = demand - this.rate = rate - - // Divide the requests over the available capacity of the input resources fairly - for (i in activeOutputs.indices) { - val output = activeOutputs[i] - val inputCapacity = output.capacity - val fraction = inputCapacity / capacity - val grantedSpeed = rate * fraction - - output.push(grantedSpeed) + if (this.rate != rate) { + // Only update the outputs if the output rate has changed + this.rate = rate + + // Divide the requests over the available capacity of the input resources fairly + for (i in activeOutputs.indices) { + val output = activeOutputs[i] + val inputCapacity = output.capacity + val fraction = inputCapacity / capacity + val grantedSpeed = rate * fraction + + output.push(grantedSpeed) + } } return deadline -- cgit v1.2.3 From 4623316cb23ce95cb8ce8db0987f948a8dc1a349 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 8 Oct 2021 11:54:45 +0200 Subject: perf(simulator): Eliminate ArrayList iteration overhead This change eliminates the overhead caused by ArrayList iteration in the MaxMinFlowMultiplexer class. --- .../simulator/flow/mux/MaxMinFlowMultiplexer.kt | 40 ++++++++++++++++------ 1 file changed, 29 insertions(+), 11 deletions(-) (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 28743276..eab5b299 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -167,6 +167,11 @@ public class MaxMinFlowMultiplexer( */ private val _activeInputs = mutableListOf() + /** + * An array containing the active inputs, which is used to reduce the overhead of an [ArrayList]. + */ + private var _inputArray = emptyArray() + /** * The active outputs registered with the scheduler. */ @@ -194,6 +199,7 @@ public class MaxMinFlowMultiplexer( */ fun registerInput(input: Input) { _activeInputs.add(input) + _inputArray = _activeInputs.toTypedArray() val hasActivationOutput = activationOutput != null @@ -201,6 +207,7 @@ public class MaxMinFlowMultiplexer( input.shouldConsumerConverge = !hasActivationOutput input.enableTimers = !hasActivationOutput input.capacity = capacity + trigger(_clock.millis()) } @@ -213,6 +220,8 @@ public class MaxMinFlowMultiplexer( _lastConvergeInput = null } + _activeInputs.remove(input) + // Re-run scheduler to distribute new load trigger(now) } @@ -365,12 +374,14 @@ public class MaxMinFlowMultiplexer( private fun doRunScheduler(delta: Long): Long { val activeInputs = _activeInputs val activeOutputs = _activeOutputs + var inputArray = _inputArray + var inputSize = _inputArray.size // Update the counters of the scheduler updateCounters(delta) // If there is no work yet, mark the inputs as idle. - if (activeInputs.isEmpty()) { + if (inputSize == 0) { demand = 0.0 rate = 0.0 return Long.MAX_VALUE @@ -380,35 +391,42 @@ public class MaxMinFlowMultiplexer( var availableCapacity = capacity var deadline = Long.MAX_VALUE var demand = 0.0 + var shouldRebuild = false - // Pull in the work of the outputs - val inputIterator = activeInputs.listIterator() - for (input in inputIterator) { + // Pull in the work of the inputs + for (i in 0 until inputSize) { + val input = inputArray[i] input.pullSync() - // Remove outputs that have finished + // Remove inputs that have finished if (!input.isActive) { input.actualRate = 0.0 - inputIterator.remove() + shouldRebuild = true } else { demand += input.limit deadline = min(deadline, input.deadline) } } + // Slow-path: Rebuild the input array based on the (apparently) updated `activeInputs` + if (shouldRebuild) { + inputArray = activeInputs.toTypedArray() + inputSize = inputArray.size + _inputArray = inputArray + } + val rate = if (demand > capacity) { // If the demand is higher than the capacity, we need use max-min fair sharing to distribute the // constrained capacity across the inputs. // Sort in-place the inputs based on their pushed flow. // Profiling shows that it is faster than maintaining some kind of sorted set. - activeInputs.sort() + inputArray.sort() // Divide the available output capacity fairly over the inputs using max-min fair sharing - val size = activeInputs.size - for (i in activeInputs.indices) { - val input = activeInputs[i] - val availableShare = availableCapacity / (size - i) + for (i in 0 until inputSize) { + val input = inputArray[i] + val availableShare = availableCapacity / (inputSize - i) val grantedRate = min(input.allowedRate, availableShare) availableCapacity -= grantedRate -- cgit v1.2.3 From f9483bc5782d86637777c0d21c383ce3e2c0851b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 8 Oct 2021 12:23:48 +0200 Subject: perf(simulator): Optimize clock storage --- .../org/opendc/simulator/flow/internal/FlowEngineImpl.kt | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt index a9234abf..450556f8 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt @@ -38,7 +38,7 @@ import kotlin.coroutines.CoroutineContext * @param context The coroutine context to use. * @param clock The virtual simulation clock. */ -internal class FlowEngineImpl(private val context: CoroutineContext, override val clock: Clock) : FlowEngine, Runnable { +internal class FlowEngineImpl(private val context: CoroutineContext, clock: Clock) : FlowEngine, Runnable { /** * The [Delay] instance that provides scheduled execution of [Runnable]s. */ @@ -70,6 +70,13 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va */ private var batchIndex = 0 + /** + * The virtual [Clock] of this engine. + */ + override val clock: Clock + get() = _clock + private val _clock: Clock = clock + /** * Update the specified [ctx] synchronously. */ @@ -113,7 +120,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va try { // Flush the work if the engine is not already running if (batchIndex == 1 && queue.isNotEmpty()) { - doRunEngine(clock.millis()) + doRunEngine(_clock.millis()) } } finally { batchIndex-- @@ -122,7 +129,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, override va /* Runnable */ override fun run() { - val now = clock.millis() + val now = _clock.millis() val invocation = futureInvocations.poll() // Clear invocation from future invocation queue assert(now >= invocation.timestamp) { "Future invocations invariant violated" } -- cgit v1.2.3 From 4433185388f843140aad096dfdd88dfe2398bf2b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 8 Oct 2021 16:49:55 +0200 Subject: perf(simulator): Specialize FlowEngine queues This change specializes the queues used by the FlowEngine implementation in order to reduce the overhead caused by type-erasure of generics. --- .../flow/internal/FlowConsumerContextImpl.kt | 22 ++- .../opendc/simulator/flow/internal/FlowDeque.kt | 116 ++++++++++++ .../simulator/flow/internal/FlowEngineImpl.kt | 38 +--- .../simulator/flow/internal/FlowTimerQueue.kt | 195 +++++++++++++++++++++ 4 files changed, 329 insertions(+), 42 deletions(-) create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt create mode 100644 opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index 9a568897..0baa7880 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -134,8 +134,8 @@ internal class FlowConsumerContextImpl( /** * The timers at which the context is scheduled to be interrupted. */ - private var _timer: FlowEngineImpl.Timer? = null - private val _pendingTimers: ArrayDeque = ArrayDeque(5) + private var _timer: Long = Long.MAX_VALUE + private val _pendingTimers: ArrayDeque = ArrayDeque(5) override fun start() { check(_flags and ConnState == ConnPending) { "Consumer is already started" } @@ -217,8 +217,8 @@ internal class FlowConsumerContextImpl( */ fun doUpdate( now: Long, - visited: ArrayDeque, - timerQueue: PriorityQueue, + visited: FlowDeque, + timerQueue: FlowTimerQueue, isImmediate: Boolean ) { var flags = _flags @@ -326,8 +326,7 @@ internal class FlowConsumerContextImpl( // Prune the head timer if this is a delayed update val timer = if (!isImmediate) { // Invariant: Any pending timer should only point to a future timestamp - // See also `scheduleDelayed` - val timer = pendingTimers.poll() + val timer = pendingTimers.poll() ?: Long.MAX_VALUE _timer = timer timer } else { @@ -342,7 +341,7 @@ internal class FlowConsumerContextImpl( if (newDeadline == Long.MAX_VALUE || flags and ConnState != ConnActive || flags and ConnDisableTimers != 0 || - (timer != null && newDeadline >= timer.target) + (timer != Long.MAX_VALUE && newDeadline >= timer) ) { // Ignore any deadline scheduled at the maximum value // This indicates that the source does not want to register a timer @@ -350,12 +349,11 @@ internal class FlowConsumerContextImpl( } // Construct a timer for the new deadline and add it to the global queue of timers - val newTimer = FlowEngineImpl.Timer(this, newDeadline) - _timer = newTimer - timerQueue.add(newTimer) + _timer = newDeadline + timerQueue.add(this, newDeadline) - // A timer already exists for this connection, so add it to the queue of pending timers - if (timer != null) { + // Slow-path: a timer already exists for this connection, so add it to the queue of pending timers + if (timer != Long.MAX_VALUE) { pendingTimers.addFirst(timer) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt new file mode 100644 index 00000000..c6cba4b7 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowDeque.kt @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.internal + +import java.util.* + +/** + * A specialized [ArrayDeque] for [FlowConsumerContextImpl] implementations. + */ +internal class FlowDeque(initialCapacity: Int = 256) { + /** + * The array of elements in the queue. + */ + private var _elements: Array = arrayOfNulls(initialCapacity) + private var _head = 0 + private var _tail = 0 + + /** + * Determine whether this queue is not empty. + */ + fun isNotEmpty(): Boolean { + return _head != _tail + } + + /** + * Add the specified [ctx] to the queue. + */ + fun add(ctx: FlowConsumerContextImpl) { + val es = _elements + var tail = _tail + + es[tail] = ctx + + tail = inc(tail, es.size) + _tail = tail + + if (_head == tail) { + doubleCapacity() + } + } + + /** + * Remove a [FlowConsumerContextImpl] from the queue or `null` if the queue is empty. + */ + fun poll(): FlowConsumerContextImpl? { + val es = _elements + val head = _head + val ctx = es[head] + + if (ctx != null) { + es[head] = null + _head = inc(head, es.size) + } + + return ctx + } + + /** + * Clear the queue. + */ + fun clear() { + _elements.fill(null) + _head = 0 + _tail = 0 + } + + private fun inc(i: Int, modulus: Int): Int { + var x = i + if (++x >= modulus) { + x = 0 + } + return x + } + + /** + * Doubles the capacity of this deque + */ + private fun doubleCapacity() { + assert(_head == _tail) + val p = _head + val n = _elements.size + val r = n - p // number of elements to the right of p + + val newCapacity = n shl 1 + check(newCapacity >= 0) { "Sorry, deque too big" } + + val a = arrayOfNulls(newCapacity) + + _elements.copyInto(a, 0, p, r) + _elements.copyInto(a, r, 0, p) + + _elements = a + _head = 0 + _tail = n + } +} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt index 450556f8..55debef0 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt @@ -48,12 +48,12 @@ internal class FlowEngineImpl(private val context: CoroutineContext, clock: Cloc /** * The queue of connection updates that are scheduled for immediate execution. */ - private val queue = ArrayDeque() + private val queue = FlowDeque() /** * A priority queue containing the connection updates to be scheduled in the future. */ - private val futureQueue = PriorityQueue() + private val futureQueue = FlowTimerQueue() /** * The stack of engine invocations to occur in the future. @@ -63,7 +63,7 @@ internal class FlowEngineImpl(private val context: CoroutineContext, clock: Cloc /** * The systems that have been visited during the engine cycle. */ - private val visited: ArrayDeque = ArrayDeque() + private val visited = FlowDeque() /** * The index in the batch stack. @@ -151,17 +151,8 @@ internal class FlowEngineImpl(private val context: CoroutineContext, clock: Cloc // Execute all scheduled updates at current timestamp while (true) { - val timer = futureQueue.peek() ?: break - val target = timer.target - - if (target > now) { - break - } - - assert(target >= now) { "Internal inconsistency: found update of the past" } - - futureQueue.poll() - timer.ctx.doUpdate(now, visited, futureQueue, isImmediate = false) + val ctx = futureQueue.poll(now) ?: break + ctx.doUpdate(now, visited, futureQueue, isImmediate = false) } // Repeat execution of all immediate updates until the system has converged to a steady-state @@ -184,9 +175,9 @@ internal class FlowEngineImpl(private val context: CoroutineContext, clock: Cloc } // Schedule an engine invocation for the next update to occur. - val headTimer = futureQueue.peek() - if (headTimer != null) { - trySchedule(now, futureInvocations, headTimer.target) + val headDeadline = futureQueue.peekDeadline() + if (headDeadline != Long.MAX_VALUE) { + trySchedule(now, futureInvocations, headDeadline) } } @@ -224,17 +215,4 @@ internal class FlowEngineImpl(private val context: CoroutineContext, clock: Cloc */ fun cancel() = handle.dispose() } - - /** - * An update call for [ctx] that is scheduled for [target]. - * - * This class represents an update in the future at [target] requested by [ctx]. - */ - class Timer(@JvmField val ctx: FlowConsumerContextImpl, @JvmField val target: Long) : Comparable { - override fun compareTo(other: Timer): Int { - return target.compareTo(other.target) - } - - override fun toString(): String = "Timer[ctx=$ctx,timestamp=$target]" - } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt new file mode 100644 index 00000000..22a390e6 --- /dev/null +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowTimerQueue.kt @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.simulator.flow.internal + +/** + * Specialized priority queue for flow timers. + */ +internal class FlowTimerQueue(initialCapacity: Int = 256) { + /** + * The binary heap of deadlines. + */ + private var _deadlines = LongArray(initialCapacity) { Long.MIN_VALUE } + + /** + * The binary heap of [FlowConsumerContextImpl]s. + */ + private var _pending = arrayOfNulls(initialCapacity) + + /** + * The number of elements in the priority queue. + */ + private var size = 0 + + /** + * Register a timer for [ctx] with [deadline]. + */ + fun add(ctx: FlowConsumerContextImpl, deadline: Long) { + val i = size + val deadlines = _deadlines + if (i >= deadlines.size) { + grow() + } + + siftUp(deadlines, _pending, i, ctx, deadline) + + size = i + 1 + } + + /** + * Update all pending [FlowConsumerContextImpl]s at the timestamp [now]. + */ + fun poll(now: Long): FlowConsumerContextImpl? { + if (size == 0) { + return null + } + + val deadlines = _deadlines + val deadline = deadlines[0] + + if (now < deadline) { + return null + } + + val pending = _pending + val res = pending[0] + val s = --size + + val nextDeadline = deadlines[s] + val next = pending[s]!! + + // Clear the last element of the queue + pending[s] = null + deadlines[s] = Long.MIN_VALUE + + if (s != 0) { + siftDown(deadlines, pending, next, nextDeadline) + } + + return res + } + + /** + * Find the earliest deadline in the queue. + */ + fun peekDeadline(): Long { + return if (size == 0) Long.MAX_VALUE else _deadlines[0] + } + + /** + * Increases the capacity of the array. + */ + private fun grow() { + val oldCapacity = _deadlines.size + // Double size if small; else grow by 50% + val newCapacity = oldCapacity + if (oldCapacity < 64) oldCapacity + 2 else oldCapacity shr 1 + + _deadlines = _deadlines.copyOf(newCapacity) + _pending = _pending.copyOf(newCapacity) + } + + /** + * Insert item [ctx] at position [pos], maintaining heap invariant by promoting [ctx] up the tree until it is + * greater than or equal to its parent, or is the root. + * + * @param deadlines The heap of deadlines. + * @param pending The heap of contexts. + * @param pos The position to fill. + * @param ctx The [FlowConsumerContextImpl] to insert. + * @param deadline The deadline of the context. + */ + private fun siftUp( + deadlines: LongArray, + pending: Array, + pos: Int, + ctx: FlowConsumerContextImpl, + deadline: Long + ) { + var k = pos + + while (k > 0) { + val parent = (k - 1) ushr 1 + val parentDeadline = deadlines[parent] + + if (deadline >= parentDeadline) { + break + } + + deadlines[k] = parentDeadline + pending[k] = pending[parent] + + k = parent + } + + deadlines[k] = deadline + pending[k] = ctx + } + + /** + * Inserts [ctx] at the top, maintaining heap invariant by demoting [ctx] down the tree repeatedly until it + * is less than or equal to its children or is a leaf. + * + * @param deadlines The heap of deadlines. + * @param pending The heap of contexts. + * @param ctx The [FlowConsumerContextImpl] to insert. + * @param deadline The deadline of the context. + */ + private fun siftDown( + deadlines: LongArray, + pending: Array, + ctx: FlowConsumerContextImpl, + deadline: Long + ) { + var k = 0 + val size = size + val half = size ushr 1 + + while (k < half) { + var child = (k shl 1) + 1 + + var childDeadline = deadlines[child] + val right = child + 1 + + if (right < size) { + val rightDeadline = deadlines[right] + + if (childDeadline > rightDeadline) { + child = right + childDeadline = rightDeadline + } + } + + if (deadline <= childDeadline) { + break + } + + deadlines[k] = childDeadline + pending[k] = pending[child] + + k = child + } + + deadlines[k] = deadline + pending[k] = ctx + } +} -- cgit v1.2.3 From e2f002358e9d5be2239fa2cb7ca92c9c96a21b6f Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Fri, 8 Oct 2021 17:10:45 +0200 Subject: perf(simulator): Eliminate clock access in hot path This change eliminates the clock calls in the hot path, by passing the current timestamp directly as method parameter. --- .../kotlin/org/opendc/simulator/flow/FlowConnection.kt | 7 +++++++ .../org/opendc/simulator/flow/FlowConsumerContext.kt | 4 +++- .../kotlin/org/opendc/simulator/flow/FlowForwarder.kt | 4 ++++ .../simulator/flow/internal/FlowConsumerContextImpl.kt | 12 +++++++----- .../opendc/simulator/flow/internal/FlowEngineImpl.kt | 5 +---- .../opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt | 18 ++++++++++-------- 6 files changed, 32 insertions(+), 18 deletions(-) (limited to 'opendc-simulator/opendc-simulator-flow/src') diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt index c327e1e9..8ff0bc76 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConnection.kt @@ -51,6 +51,13 @@ public interface FlowConnection : AutoCloseable { */ public fun pull() + /** + * Pull the source. + * + * @param now The timestamp at which the connection is pulled. + */ + public fun pull(now: Long) + /** * Push the given flow [rate] over this connection. * diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt index d7182497..98922ab3 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowConsumerContext.kt @@ -55,6 +55,8 @@ public interface FlowConsumerContext : FlowConnection { /** * Synchronously pull the source of the connection. + * + * @param now The timestamp at which the connection is pulled. */ - public fun pullSync() + public fun pullSync(now: Long) } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index 7230a966..e3bdd7ba 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -72,6 +72,10 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled _innerCtx?.pull() } + override fun pull(now: Long) { + _innerCtx?.pull(now) + } + @JvmField var lastPull = Long.MAX_VALUE override fun push(rate: Double) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt index 0baa7880..58ca918b 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowConsumerContextImpl.kt @@ -164,17 +164,21 @@ internal class FlowConsumerContextImpl( } } - override fun pull() { + override fun pull(now: Long) { val flags = _flags if (flags and ConnState != ConnActive) { return } // Mark connection as pulled - scheduleImmediate(_clock.millis(), flags or ConnPulled) + scheduleImmediate(now, flags or ConnPulled) } - override fun pullSync() { + override fun pull() { + pull(_clock.millis()) + } + + override fun pullSync(now: Long) { val flags = _flags // Do not attempt to flush the connection if the connection is closed or an update is already active @@ -182,8 +186,6 @@ internal class FlowConsumerContextImpl( return } - val now = _clock.millis() - if (flags and (ConnPulled or ConnPushed) != 0 || _deadline == now) { engine.scheduleSync(now, this) } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt index 55debef0..3c79d54e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowEngineImpl.kt @@ -129,11 +129,8 @@ internal class FlowEngineImpl(private val context: CoroutineContext, clock: Cloc /* Runnable */ override fun run() { - val now = _clock.millis() val invocation = futureInvocations.poll() // Clear invocation from future invocation queue - assert(now >= invocation.timestamp) { "Future invocations invariant violated" } - - doRunEngine(now) + doRunEngine(invocation.timestamp) } /** diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index eab5b299..a0fb8a4e 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -302,7 +302,7 @@ public class MaxMinFlowMultiplexer( // a few inputs and little changes at the same timestamp. // We always pick for option (1) unless there are no outputs available. if (activationOutput != null) { - activationOutput.pull() + activationOutput.pull(now) return } else { runScheduler(now) @@ -320,7 +320,7 @@ public class MaxMinFlowMultiplexer( return try { _schedulerActive = true - doRunScheduler(delta) + doRunScheduler(now, delta) } finally { _schedulerActive = false } @@ -371,7 +371,7 @@ public class MaxMinFlowMultiplexer( * * @return The deadline after which a new scheduling cycle should start. */ - private fun doRunScheduler(delta: Long): Long { + private fun doRunScheduler(now: Long, delta: Long): Long { val activeInputs = _activeInputs val activeOutputs = _activeOutputs var inputArray = _inputArray @@ -396,7 +396,8 @@ public class MaxMinFlowMultiplexer( // Pull in the work of the inputs for (i in 0 until inputSize) { val input = inputArray[i] - input.pullSync() + + input.pullSync(now) // Remove inputs that have finished if (!input.isActive) { @@ -595,8 +596,8 @@ public class MaxMinFlowMultiplexer( /** * Pull the source if necessary. */ - fun pullSync() { - _ctx?.pullSync() + fun pullSync(now: Long) { + _ctx?.pullSync(now) } /* FlowConsumer */ @@ -733,8 +734,8 @@ public class MaxMinFlowMultiplexer( /** * Pull this output. */ - fun pull() { - _conn?.pull() + fun pull(now: Long) { + _conn?.pull(now) } override fun onStart(conn: FlowConnection, now: Long) { @@ -772,6 +773,7 @@ public class MaxMinFlowMultiplexer( // Output is not the activation output, so trigger activation output and do not install timer for this // output (by returning `Long.MAX_VALUE`) scheduler.trigger(now) + Long.MAX_VALUE } } -- cgit v1.2.3