diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-07 14:39:03 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-10-08 17:11:52 +0200 |
| commit | 774ed886ac8f84ae2974c1204534ee332d920864 (patch) | |
| tree | e390f4393d194d435c75a64fcb45b9d52d4123d1 /opendc-simulator/opendc-simulator-flow | |
| parent | a0340a8752c4c4ed8413944b1dfb81b9481b6556 (diff) | |
fix(simulator): Count interference for multiplexer inputs
This change updates the SimAbstractHypervisor and MaxMinFlowMultiplexer
to count interference of multiplexer inputs, instead of only counting
them for the scheduler as a whole.
Diffstat (limited to 'opendc-simulator/opendc-simulator-flow')
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt | 26 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt | 9 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt | 12 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt (renamed from opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt) | 28 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt | 147 |
5 files changed, 138 insertions, 84 deletions
diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt index b02426e3..5f1057e8 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt @@ -22,7 +22,7 @@ package org.opendc.simulator.flow -import org.opendc.simulator.flow.internal.FlowCountersImpl +import org.opendc.simulator.flow.internal.MutableFlowCounters /** * Abstract implementation of the [FlowConsumer] which can be re-used by other implementations. @@ -56,13 +56,6 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi get() = ctx?.demand ?: 0.0 /** - * The flow counters to track the flow metrics of the consumer. - */ - public override val counters: FlowCounters - get() = _counters - private val _counters = FlowCountersImpl() - - /** * The [FlowConsumerContext] that is currently running. */ protected var ctx: FlowConsumerContext? = null @@ -89,7 +82,7 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi /** * Update the counters of the flow consumer. */ - protected fun updateCounters(ctx: FlowConnection, delta: Long) { + protected fun MutableFlowCounters.update(ctx: FlowConnection, delta: Long) { val demand = _previousDemand val capacity = _previousCapacity @@ -100,25 +93,12 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi return } - val counters = _counters val deltaS = delta / 1000.0 val total = demand * deltaS val work = capacity * deltaS val actualWork = ctx.rate * deltaS - counters.demand += work - counters.actual += actualWork - counters.remaining += (total - actualWork) - } - - /** - * Update the counters of the flow consumer. - */ - protected fun updateCounters(demand: Double, actual: Double, remaining: Double) { - val counters = _counters - counters.demand += demand - counters.actual += actual - counters.remaining += remaining + increment(work, actualWork, (total - actualWork), 0.0) } final override fun startConsumer(source: FlowSource) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index 7eaaf6c2..229fd96a 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.flow import mu.KotlinLogging -import org.opendc.simulator.flow.internal.FlowCountersImpl +import org.opendc.simulator.flow.internal.MutableFlowCounters import kotlin.math.max /** @@ -117,7 +117,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled override val counters: FlowCounters get() = _counters - private val _counters = FlowCountersImpl() + private val _counters = MutableFlowCounters() override fun startConsumer(source: FlowSource) { check(delegate == null) { "Forwarder already active" } @@ -245,8 +245,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled val total = ctx.capacity * deltaS val work = _demand * deltaS val actualWork = ctx.rate * deltaS - counters.demand += work - counters.actual += actualWork - counters.remaining += (total - actualWork) + + counters.increment(work, actualWork, (total - actualWork), 0.0) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt index b4eb6a38..170ab1c0 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.flow +import org.opendc.simulator.flow.internal.MutableFlowCounters + /** * A [FlowSink] represents a sink with a fixed capacity. * @@ -34,6 +36,12 @@ public class FlowSink( initialCapacity: Double, private val parent: FlowConvergenceListener? = null ) : AbstractFlowConsumer(engine, initialCapacity) { + /** + * The flow counters to track the flow metrics of the consumer. + */ + public override val counters: FlowCounters + get() = _counters + private val _counters = MutableFlowCounters() override fun start(ctx: FlowConsumerContext) { if (parent != null) { @@ -52,11 +60,11 @@ public class FlowSink( delta: Long, rate: Double ) { - updateCounters(ctx, delta) + _counters.update(ctx, delta) } override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { - updateCounters(ctx, delta) + _counters.update(ctx, delta) cancel() } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt index d2fa5228..d990dc61 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt @@ -27,17 +27,27 @@ import org.opendc.simulator.flow.FlowCounters /** * Mutable implementation of the [FlowCounters] interface. */ -internal class FlowCountersImpl : FlowCounters { - override var demand: Double = 0.0 - override var actual: Double = 0.0 - override var remaining: Double = 0.0 - override var interference: Double = 0.0 +public class MutableFlowCounters : FlowCounters { + override val demand: Double + get() = _counters[0] + override val actual: Double + get() = _counters[1] + override val remaining: Double + get() = _counters[2] + override val interference: Double + get() = _counters[3] + private val _counters = DoubleArray(4) override fun reset() { - demand = 0.0 - actual = 0.0 - remaining = 0.0 - interference = 0.0 + _counters.fill(0.0) + } + + public fun increment(demand: Double, actual: Double, remaining: Double, interference: Double) { + val counters = _counters + counters[0] += demand + counters[1] += actual + counters[2] += remaining + counters[3] += interference } override fun toString(): String { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 9131eb54..eaa3f7c5 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -25,7 +25,7 @@ package org.opendc.simulator.flow.mux import org.opendc.simulator.flow.* import org.opendc.simulator.flow.interference.InterferenceDomain import org.opendc.simulator.flow.interference.InterferenceKey -import org.opendc.simulator.flow.internal.FlowCountersImpl +import org.opendc.simulator.flow.internal.MutableFlowCounters import kotlin.math.max import kotlin.math.min @@ -85,7 +85,7 @@ public class MaxMinFlowMultiplexer( private val scheduler = Scheduler(engine, parent) override fun newInput(key: InterferenceKey?): FlowConsumer { - val provider = Input(engine, scheduler, interferenceDomain, key) + val provider = Input(engine, scheduler, interferenceDomain, key, scheduler.capacity) _inputs.add(provider) return provider } @@ -135,11 +135,11 @@ public class MaxMinFlowMultiplexer( /** * Helper class containing the scheduler state. */ - private class Scheduler(private val engine: FlowEngine, private val parent: FlowConvergenceListener?) { + private class Scheduler(engine: FlowEngine, private val parent: FlowConvergenceListener?) { /** * The flow counters of this scheduler. */ - @JvmField val counters = FlowCountersImpl() + @JvmField val counters = MutableFlowCounters() /** * The flow rate of the multiplexer. @@ -184,6 +184,11 @@ public class MaxMinFlowMultiplexer( private var _lastConvergeInput: Input? = null /** + * The simulation clock. + */ + private val _clock = engine.clock + + /** * Register the specified [input] to this scheduler. */ fun registerInput(input: Input) { @@ -195,7 +200,7 @@ public class MaxMinFlowMultiplexer( input.shouldConsumerConverge = !hasActivationOutput input.enableTimers = !hasActivationOutput input.capacity = capacity - trigger(engine.clock.millis()) + trigger(_clock.millis()) } /** @@ -447,10 +452,15 @@ public class MaxMinFlowMultiplexer( } val deltaS = delta / 1000.0 - - counters.demand += demand * deltaS - counters.actual += rate * deltaS - counters.remaining += (previousCapacity - rate) * deltaS + val demand = demand + val rate = rate + + counters.increment( + demand = demand * deltaS, + actual = rate * deltaS, + remaining = (previousCapacity - rate) * deltaS, + interference = 0.0 + ) } } @@ -458,41 +468,48 @@ public class MaxMinFlowMultiplexer( * An internal [FlowConsumer] implementation for multiplexer inputs. */ private class Input( - engine: FlowEngine, + private val engine: FlowEngine, private val scheduler: Scheduler, private val interferenceDomain: InterferenceDomain?, - @JvmField val key: InterferenceKey? - ) : AbstractFlowConsumer(engine, scheduler.capacity), FlowConsumerLogic, Comparable<Input> { - /** - * The requested limit. - */ - @JvmField var limit: Double = 0.0 - + @JvmField val key: InterferenceKey?, + initialCapacity: Double, + ) : FlowConsumer, FlowConsumerLogic, Comparable<Input> { /** - * The actual processing speed. + * A flag to indicate that the consumer is active. */ - @JvmField var actualRate: Double = 0.0 + override val isActive: Boolean + get() = _ctx != null /** - * The processing rate that is allowed by the model constraints. + * The demand of the consumer. */ - @JvmField var allowedRate: Double = 0.0 + override val demand: Double + get() = limit /** - * The deadline of the input. + * The processing rate of the consumer. */ - val deadline: Long - get() = ctx?.deadline ?: Long.MAX_VALUE + override val rate: Double + get() = actualRate /** * The capacity of the input. */ override var capacity: Double - get() = super.capacity + get() = _capacity set(value) { allowedRate = min(limit, value) - super.capacity = value + _capacity = value + _ctx?.capacity = value } + private var _capacity = initialCapacity + + /** + * The flow counters to track the flow metrics of the consumer. + */ + override val counters: FlowCounters + get() = _counters + private val _counters = MutableFlowCounters() /** * A flag to enable timers for the input. @@ -500,7 +517,7 @@ public class MaxMinFlowMultiplexer( var enableTimers: Boolean = true set(value) { field = value - ctx?.enableTimers = value + _ctx?.enableTimers = value } /** @@ -509,10 +526,36 @@ public class MaxMinFlowMultiplexer( var shouldConsumerConverge: Boolean = true set(value) { field = value - ctx?.shouldConsumerConverge = value + _ctx?.shouldConsumerConverge = value } /** + * The requested limit. + */ + @JvmField var limit: Double = 0.0 + + /** + * The actual processing speed. + */ + @JvmField var actualRate: Double = 0.0 + + /** + * The processing rate that is allowed by the model constraints. + */ + @JvmField var allowedRate: Double = 0.0 + + /** + * The deadline of the input. + */ + val deadline: Long + get() = _ctx?.deadline ?: Long.MAX_VALUE + + /** + * The [FlowConsumerContext] that is currently running. + */ + private var _ctx: FlowConsumerContext? = null + + /** * A flag to indicate that the input is closed. */ private var _isClosed: Boolean = false @@ -527,13 +570,33 @@ public class MaxMinFlowMultiplexer( cancel() } - /* AbstractFlowConsumer */ - override fun createLogic(): FlowConsumerLogic = this + /** + * Pull the source if necessary. + */ + fun pullSync() { + _ctx?.pullSync() + } - override fun start(ctx: FlowConsumerContext) { + /* FlowConsumer */ + override fun startConsumer(source: FlowSource) { check(!_isClosed) { "Cannot re-use closed input" } + check(_ctx == null) { "Consumer is in invalid state" } + + val ctx = engine.newContext(source, this) + _ctx = ctx + + ctx.capacity = capacity scheduler.registerInput(this) - super.start(ctx) + + ctx.start() + } + + override fun pull() { + _ctx?.pull() + } + + override fun cancel() { + _ctx?.close() } /* FlowConsumerLogic */ @@ -562,8 +625,7 @@ public class MaxMinFlowMultiplexer( scheduler.deregisterInput(this, now) - // BUG: Cancel the connection so that `ctx` is set to `null` - cancel() + _ctx = null } override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { @@ -574,13 +636,6 @@ public class MaxMinFlowMultiplexer( override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate) /** - * Pull the source if necessary. - */ - fun pullSync() { - ctx?.pullSync() - } - - /** * Helper method to update the flow counters of the multiplexer. */ private fun doUpdateCounters(delta: Long) { @@ -596,14 +651,16 @@ public class MaxMinFlowMultiplexer( 1.0 } + val actualRate = actualRate + val deltaS = delta / 1000.0 val demand = limit * deltaS val actual = actualRate * deltaS - val remaining = (capacity - actualRate) * deltaS - - updateCounters(demand, actual, remaining) + val remaining = (_capacity - actualRate) * deltaS + val interference = actual * max(0.0, 1 - perfScore) - scheduler.counters.interference += actual * max(0.0, 1 - perfScore) + _counters.increment(demand, actual, remaining, interference) + scheduler.counters.increment(0.0, 0.0, 0.0, interference) } } |
