diff options
Diffstat (limited to 'opendc-simulator')
| -rw-r--r-- | opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt | 90 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt | 26 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt | 9 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt | 12 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt (renamed from opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt) | 28 | ||||
| -rw-r--r-- | opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt | 147 |
6 files changed, 188 insertions, 124 deletions
diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index aac8b959..f6d8f628 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -66,45 +66,7 @@ public abstract class SimAbstractHypervisor( */ public override val counters: SimHypervisorCounters get() = _counters - private val _counters = object : SimHypervisorCounters { - @JvmField var d = 1.0 // Number of CPUs divided by total CPU capacity - - override var cpuActiveTime: Long = 0L - override var cpuIdleTime: Long = 0L - override var cpuStealTime: Long = 0L - override var cpuLostTime: Long = 0L - - private var _previousDemand = 0.0 - private var _previousActual = 0.0 - private var _previousRemaining = 0.0 - private var _previousInterference = 0.0 - - /** - * Record the CPU time of the hypervisor. - */ - fun record() { - val counters = mux.counters - val demand = counters.demand - val actual = counters.actual - val remaining = counters.remaining - val interference = counters.interference - - val demandDelta = demand - _previousDemand - val actualDelta = actual - _previousActual - val remainingDelta = remaining - _previousRemaining - val interferenceDelta = interference - _previousInterference - - _previousDemand = demand - _previousActual = actual - _previousRemaining = remaining - _previousInterference = interference - - cpuActiveTime += (actualDelta * d).roundToLong() - cpuIdleTime += (remainingDelta * d).roundToLong() - cpuStealTime += ((demandDelta - actualDelta) * d).roundToLong() - cpuLostTime += (interferenceDelta * d).roundToLong() - } - } + private val _counters = CountersImpl(this) /** * The CPU capacity of the hypervisor in MHz. @@ -204,7 +166,7 @@ public abstract class SimAbstractHypervisor( get() = (cpus.sumOf { it.counters.actual + it.counters.remaining } * d).roundToLong() override val cpuStealTime: Long get() = (cpus.sumOf { it.counters.demand - it.counters.actual } * d).roundToLong() - override val cpuLostTime: Long = 0L + override val cpuLostTime: Long = (cpus.sumOf { it.counters.interference } * d).roundToLong() } /** @@ -277,4 +239,52 @@ public abstract class SimAbstractHypervisor( override val min: Double = 0.0 } + + /** + * Implementation of [SimHypervisorCounters]. + */ + private class CountersImpl(private val hv: SimAbstractHypervisor) : SimHypervisorCounters { + @JvmField var d = 1.0 // Number of CPUs divided by total CPU capacity + + override val cpuActiveTime: Long + get() = _cpuTime[0] + override val cpuIdleTime: Long + get() = _cpuTime[1] + override val cpuStealTime: Long + get() = _cpuTime[2] + override val cpuLostTime: Long + get() = _cpuTime[3] + + private val _cpuTime = LongArray(4) + private val _previous = DoubleArray(4) + + /** + * Record the CPU time of the hypervisor. + */ + fun record() { + val cpuTime = _cpuTime + val previous = _previous + val counters = hv.mux.counters + + val demand = counters.demand + val actual = counters.actual + val remaining = counters.remaining + val interference = counters.interference + + val demandDelta = demand - previous[0] + val actualDelta = actual - previous[1] + val remainingDelta = remaining - previous[2] + val interferenceDelta = interference - previous[3] + + previous[0] = demand + previous[1] = actual + previous[2] = remaining + previous[3] = interference + + cpuTime[0] += (actualDelta * d).roundToLong() + cpuTime[1] += (remainingDelta * d).roundToLong() + cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong() + cpuTime[3] += (interferenceDelta * d).roundToLong() + } + } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt index b02426e3..5f1057e8 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/AbstractFlowConsumer.kt @@ -22,7 +22,7 @@ package org.opendc.simulator.flow -import org.opendc.simulator.flow.internal.FlowCountersImpl +import org.opendc.simulator.flow.internal.MutableFlowCounters /** * Abstract implementation of the [FlowConsumer] which can be re-used by other implementations. @@ -56,13 +56,6 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi get() = ctx?.demand ?: 0.0 /** - * The flow counters to track the flow metrics of the consumer. - */ - public override val counters: FlowCounters - get() = _counters - private val _counters = FlowCountersImpl() - - /** * The [FlowConsumerContext] that is currently running. */ protected var ctx: FlowConsumerContext? = null @@ -89,7 +82,7 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi /** * Update the counters of the flow consumer. */ - protected fun updateCounters(ctx: FlowConnection, delta: Long) { + protected fun MutableFlowCounters.update(ctx: FlowConnection, delta: Long) { val demand = _previousDemand val capacity = _previousCapacity @@ -100,25 +93,12 @@ public abstract class AbstractFlowConsumer(private val engine: FlowEngine, initi return } - val counters = _counters val deltaS = delta / 1000.0 val total = demand * deltaS val work = capacity * deltaS val actualWork = ctx.rate * deltaS - counters.demand += work - counters.actual += actualWork - counters.remaining += (total - actualWork) - } - - /** - * Update the counters of the flow consumer. - */ - protected fun updateCounters(demand: Double, actual: Double, remaining: Double) { - val counters = _counters - counters.demand += demand - counters.actual += actual - counters.remaining += remaining + increment(work, actualWork, (total - actualWork), 0.0) } final override fun startConsumer(source: FlowSource) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index 7eaaf6c2..229fd96a 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -23,7 +23,7 @@ package org.opendc.simulator.flow import mu.KotlinLogging -import org.opendc.simulator.flow.internal.FlowCountersImpl +import org.opendc.simulator.flow.internal.MutableFlowCounters import kotlin.math.max /** @@ -117,7 +117,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled override val counters: FlowCounters get() = _counters - private val _counters = FlowCountersImpl() + private val _counters = MutableFlowCounters() override fun startConsumer(source: FlowSource) { check(delegate == null) { "Forwarder already active" } @@ -245,8 +245,7 @@ public class FlowForwarder(private val engine: FlowEngine, private val isCoupled val total = ctx.capacity * deltaS val work = _demand * deltaS val actualWork = ctx.rate * deltaS - counters.demand += work - counters.actual += actualWork - counters.remaining += (total - actualWork) + + counters.increment(work, actualWork, (total - actualWork), 0.0) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt index b4eb6a38..170ab1c0 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.flow +import org.opendc.simulator.flow.internal.MutableFlowCounters + /** * A [FlowSink] represents a sink with a fixed capacity. * @@ -34,6 +36,12 @@ public class FlowSink( initialCapacity: Double, private val parent: FlowConvergenceListener? = null ) : AbstractFlowConsumer(engine, initialCapacity) { + /** + * The flow counters to track the flow metrics of the consumer. + */ + public override val counters: FlowCounters + get() = _counters + private val _counters = MutableFlowCounters() override fun start(ctx: FlowConsumerContext) { if (parent != null) { @@ -52,11 +60,11 @@ public class FlowSink( delta: Long, rate: Double ) { - updateCounters(ctx, delta) + _counters.update(ctx, delta) } override fun onFinish(ctx: FlowConsumerContext, now: Long, delta: Long, cause: Throwable?) { - updateCounters(ctx, delta) + _counters.update(ctx, delta) cancel() } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt index d2fa5228..d990dc61 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/FlowCountersImpl.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt @@ -27,17 +27,27 @@ import org.opendc.simulator.flow.FlowCounters /** * Mutable implementation of the [FlowCounters] interface. */ -internal class FlowCountersImpl : FlowCounters { - override var demand: Double = 0.0 - override var actual: Double = 0.0 - override var remaining: Double = 0.0 - override var interference: Double = 0.0 +public class MutableFlowCounters : FlowCounters { + override val demand: Double + get() = _counters[0] + override val actual: Double + get() = _counters[1] + override val remaining: Double + get() = _counters[2] + override val interference: Double + get() = _counters[3] + private val _counters = DoubleArray(4) override fun reset() { - demand = 0.0 - actual = 0.0 - remaining = 0.0 - interference = 0.0 + _counters.fill(0.0) + } + + public fun increment(demand: Double, actual: Double, remaining: Double, interference: Double) { + val counters = _counters + counters[0] += demand + counters[1] += actual + counters[2] += remaining + counters[3] += interference } override fun toString(): String { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index 9131eb54..eaa3f7c5 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -25,7 +25,7 @@ package org.opendc.simulator.flow.mux import org.opendc.simulator.flow.* import org.opendc.simulator.flow.interference.InterferenceDomain import org.opendc.simulator.flow.interference.InterferenceKey -import org.opendc.simulator.flow.internal.FlowCountersImpl +import org.opendc.simulator.flow.internal.MutableFlowCounters import kotlin.math.max import kotlin.math.min @@ -85,7 +85,7 @@ public class MaxMinFlowMultiplexer( private val scheduler = Scheduler(engine, parent) override fun newInput(key: InterferenceKey?): FlowConsumer { - val provider = Input(engine, scheduler, interferenceDomain, key) + val provider = Input(engine, scheduler, interferenceDomain, key, scheduler.capacity) _inputs.add(provider) return provider } @@ -135,11 +135,11 @@ public class MaxMinFlowMultiplexer( /** * Helper class containing the scheduler state. */ - private class Scheduler(private val engine: FlowEngine, private val parent: FlowConvergenceListener?) { + private class Scheduler(engine: FlowEngine, private val parent: FlowConvergenceListener?) { /** * The flow counters of this scheduler. */ - @JvmField val counters = FlowCountersImpl() + @JvmField val counters = MutableFlowCounters() /** * The flow rate of the multiplexer. @@ -184,6 +184,11 @@ public class MaxMinFlowMultiplexer( private var _lastConvergeInput: Input? = null /** + * The simulation clock. + */ + private val _clock = engine.clock + + /** * Register the specified [input] to this scheduler. */ fun registerInput(input: Input) { @@ -195,7 +200,7 @@ public class MaxMinFlowMultiplexer( input.shouldConsumerConverge = !hasActivationOutput input.enableTimers = !hasActivationOutput input.capacity = capacity - trigger(engine.clock.millis()) + trigger(_clock.millis()) } /** @@ -447,10 +452,15 @@ public class MaxMinFlowMultiplexer( } val deltaS = delta / 1000.0 - - counters.demand += demand * deltaS - counters.actual += rate * deltaS - counters.remaining += (previousCapacity - rate) * deltaS + val demand = demand + val rate = rate + + counters.increment( + demand = demand * deltaS, + actual = rate * deltaS, + remaining = (previousCapacity - rate) * deltaS, + interference = 0.0 + ) } } @@ -458,41 +468,48 @@ public class MaxMinFlowMultiplexer( * An internal [FlowConsumer] implementation for multiplexer inputs. */ private class Input( - engine: FlowEngine, + private val engine: FlowEngine, private val scheduler: Scheduler, private val interferenceDomain: InterferenceDomain?, - @JvmField val key: InterferenceKey? - ) : AbstractFlowConsumer(engine, scheduler.capacity), FlowConsumerLogic, Comparable<Input> { - /** - * The requested limit. - */ - @JvmField var limit: Double = 0.0 - + @JvmField val key: InterferenceKey?, + initialCapacity: Double, + ) : FlowConsumer, FlowConsumerLogic, Comparable<Input> { /** - * The actual processing speed. + * A flag to indicate that the consumer is active. */ - @JvmField var actualRate: Double = 0.0 + override val isActive: Boolean + get() = _ctx != null /** - * The processing rate that is allowed by the model constraints. + * The demand of the consumer. */ - @JvmField var allowedRate: Double = 0.0 + override val demand: Double + get() = limit /** - * The deadline of the input. + * The processing rate of the consumer. */ - val deadline: Long - get() = ctx?.deadline ?: Long.MAX_VALUE + override val rate: Double + get() = actualRate /** * The capacity of the input. */ override var capacity: Double - get() = super.capacity + get() = _capacity set(value) { allowedRate = min(limit, value) - super.capacity = value + _capacity = value + _ctx?.capacity = value } + private var _capacity = initialCapacity + + /** + * The flow counters to track the flow metrics of the consumer. + */ + override val counters: FlowCounters + get() = _counters + private val _counters = MutableFlowCounters() /** * A flag to enable timers for the input. @@ -500,7 +517,7 @@ public class MaxMinFlowMultiplexer( var enableTimers: Boolean = true set(value) { field = value - ctx?.enableTimers = value + _ctx?.enableTimers = value } /** @@ -509,10 +526,36 @@ public class MaxMinFlowMultiplexer( var shouldConsumerConverge: Boolean = true set(value) { field = value - ctx?.shouldConsumerConverge = value + _ctx?.shouldConsumerConverge = value } /** + * The requested limit. + */ + @JvmField var limit: Double = 0.0 + + /** + * The actual processing speed. + */ + @JvmField var actualRate: Double = 0.0 + + /** + * The processing rate that is allowed by the model constraints. + */ + @JvmField var allowedRate: Double = 0.0 + + /** + * The deadline of the input. + */ + val deadline: Long + get() = _ctx?.deadline ?: Long.MAX_VALUE + + /** + * The [FlowConsumerContext] that is currently running. + */ + private var _ctx: FlowConsumerContext? = null + + /** * A flag to indicate that the input is closed. */ private var _isClosed: Boolean = false @@ -527,13 +570,33 @@ public class MaxMinFlowMultiplexer( cancel() } - /* AbstractFlowConsumer */ - override fun createLogic(): FlowConsumerLogic = this + /** + * Pull the source if necessary. + */ + fun pullSync() { + _ctx?.pullSync() + } - override fun start(ctx: FlowConsumerContext) { + /* FlowConsumer */ + override fun startConsumer(source: FlowSource) { check(!_isClosed) { "Cannot re-use closed input" } + check(_ctx == null) { "Consumer is in invalid state" } + + val ctx = engine.newContext(source, this) + _ctx = ctx + + ctx.capacity = capacity scheduler.registerInput(this) - super.start(ctx) + + ctx.start() + } + + override fun pull() { + _ctx?.pull() + } + + override fun cancel() { + _ctx?.close() } /* FlowConsumerLogic */ @@ -562,8 +625,7 @@ public class MaxMinFlowMultiplexer( scheduler.deregisterInput(this, now) - // BUG: Cancel the connection so that `ctx` is set to `null` - cancel() + _ctx = null } override fun onConverge(ctx: FlowConsumerContext, now: Long, delta: Long) { @@ -574,13 +636,6 @@ public class MaxMinFlowMultiplexer( override fun compareTo(other: Input): Int = allowedRate.compareTo(other.allowedRate) /** - * Pull the source if necessary. - */ - fun pullSync() { - ctx?.pullSync() - } - - /** * Helper method to update the flow counters of the multiplexer. */ private fun doUpdateCounters(delta: Long) { @@ -596,14 +651,16 @@ public class MaxMinFlowMultiplexer( 1.0 } + val actualRate = actualRate + val deltaS = delta / 1000.0 val demand = limit * deltaS val actual = actualRate * deltaS - val remaining = (capacity - actualRate) * deltaS - - updateCounters(demand, actual, remaining) + val remaining = (_capacity - actualRate) * deltaS + val interference = actual * max(0.0, 1 - perfScore) - scheduler.counters.interference += actual * max(0.0, 1 - perfScore) + _counters.increment(demand, actual, remaining, interference) + scheduler.counters.increment(0.0, 0.0, 0.0, interference) } } |
