diff options
15 files changed, 114 insertions, 106 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index d7b7caad..368b0086 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -209,7 +209,7 @@ class CapelinIntegrationTest { { assertEquals(6028050, this@CapelinIntegrationTest.monitor.idleTime) { "Idle time incorrect" } }, { assertEquals(14712749, this@CapelinIntegrationTest.monitor.activeTime) { "Active time incorrect" } }, { assertEquals(12532907, this@CapelinIntegrationTest.monitor.stealTime) { "Steal time incorrect" } }, - { assertEquals(467963, this@CapelinIntegrationTest.monitor.lostTime) { "Lost time incorrect" } } + { assertEquals(477068, this@CapelinIntegrationTest.monitor.lostTime) { "Lost time incorrect" } } ) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index 8e925bdf..9495095e 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -30,7 +30,7 @@ import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.flow.* -import org.opendc.simulator.flow.interference.InterferenceKey +import org.opendc.simulator.compute.kernel.interference.VmInterferenceKey import org.opendc.simulator.flow.mux.FlowMultiplexer import kotlin.math.roundToLong @@ -144,6 +144,10 @@ public abstract class SimAbstractHypervisor( if (delta > 0) { _counters.record() + + for (vm in _vms) { + vm._counters.record() + } } val load = cpuDemand / cpuCapacity @@ -171,19 +175,19 @@ public abstract class SimAbstractHypervisor( /** * The interference key of this virtual machine. */ - private val interferenceKey: InterferenceKey? = interferenceId?.let { interferenceDomain?.createKey(it) } + private val interferenceKey: VmInterferenceKey? = interferenceId?.let { interferenceDomain?.createKey(it) } /** * The vCPUs of the machine. */ - override val cpus = model.cpus.map { cpu -> VCpu(mux, mux.newInput(cpu.frequency, interferenceKey), cpu) } + override val cpus = model.cpus.map { cpu -> VCpu(mux, mux.newInput(cpu.frequency), cpu) } /** * The resource counters associated with the hypervisor. */ override val counters: SimHypervisorCounters get() = _counters - private val _counters = VmCountersImpl(cpus) + @JvmField val _counters = VmCountersImpl(cpus, interferenceDomain, interferenceKey) /** * The CPU capacity of the hypervisor in MHz. @@ -317,8 +321,8 @@ public abstract class SimAbstractHypervisor( override val cpuLostTime: Long get() = _cpuTime[3] - private val _cpuTime = LongArray(4) - private val _previous = DoubleArray(4) + val _cpuTime = LongArray(4) + private val _previous = DoubleArray(3) /** * Record the CPU time of the hypervisor. @@ -331,22 +335,18 @@ public abstract class SimAbstractHypervisor( val demand = counters.demand val actual = counters.actual val remaining = counters.remaining - val interference = counters.interference val demandDelta = demand - previous[0] val actualDelta = actual - previous[1] val remainingDelta = remaining - previous[2] - val interferenceDelta = interference - previous[3] previous[0] = demand previous[1] = actual previous[2] = remaining - previous[3] = interference cpuTime[0] += (actualDelta * d).roundToLong() cpuTime[1] += (remainingDelta * d).roundToLong() cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong() - cpuTime[3] += (interferenceDelta * d).roundToLong() } override fun flush() { @@ -358,17 +358,71 @@ public abstract class SimAbstractHypervisor( /** * A [SimHypervisorCounters] implementation for a virtual machine. */ - private class VmCountersImpl(private val cpus: List<VCpu>) : SimHypervisorCounters { + private inner class VmCountersImpl( + private val cpus: List<VCpu>, + private val interferenceDomain: VmInterferenceDomain?, + private val key: VmInterferenceKey? + ) : SimHypervisorCounters { private val d = cpus.size / cpus.sumOf { it.model.frequency } * 1000 override val cpuActiveTime: Long - get() = (cpus.sumOf { it.counters.actual } * d).roundToLong() + get() = _cpuTime[0] override val cpuIdleTime: Long - get() = (cpus.sumOf { it.counters.remaining } * d).roundToLong() + get() = _cpuTime[1] override val cpuStealTime: Long - get() = (cpus.sumOf { it.counters.demand - it.counters.actual } * d).roundToLong() + get() = _cpuTime[2] override val cpuLostTime: Long - get() = (cpus.sumOf { it.counters.interference } * d).roundToLong() + get() = _cpuTime[3] + + private val _cpuTime = LongArray(4) + private val _previous = DoubleArray(3) + + /** + * Record the CPU time of the hypervisor. + */ + fun record() { + val cpuTime = _cpuTime + val previous = _previous + + var demand = 0.0 + var actual = 0.0 + var remaining = 0.0 + + for (cpu in cpus) { + val counters = cpu.counters + + actual += counters.actual + demand += counters.demand + remaining += counters.remaining + } + + val demandDelta = demand - previous[0] + val actualDelta = actual - previous[1] + val remainingDelta = remaining - previous[2] + + previous[0] = demand + previous[1] = actual + previous[2] = remaining + + val d = d + cpuTime[0] += (actualDelta * d).roundToLong() + cpuTime[1] += (remainingDelta * d).roundToLong() + cpuTime[2] += ((demandDelta - actualDelta) * d).roundToLong() + + // Compute the performance penalty due to flow interference + val interferenceDomain = interferenceDomain + if (interferenceDomain != null) { + val mux = mux + val load = mux.rate / mux.capacity.coerceAtLeast(1.0) + val penalty = 1 - interferenceDomain.apply(key, load) + val interference = (actualDelta * d * penalty).roundToLong() + + if (interference > 0) { + cpuTime[3] += interference + _counters._cpuTime[3] += interference + } + } + } override fun flush() { for (cpu in cpus) { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt index 36f76650..f6a700b9 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt @@ -50,7 +50,7 @@ public class SimFairShareHypervisor( /** * The multiplexer that distributes the computing capacity. */ - override val mux: FlowMultiplexer = MaxMinFlowMultiplexer(engine, this, interferenceDomain) + override val mux: FlowMultiplexer = MaxMinFlowMultiplexer(engine, this) override fun canFit(model: MachineModel): Boolean = true } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt index 09b03306..5220fa2d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceDomain.kt @@ -22,38 +22,45 @@ package org.opendc.simulator.compute.kernel.interference -import org.opendc.simulator.flow.interference.InterferenceDomain -import org.opendc.simulator.flow.interference.InterferenceKey - /** * The interference domain of a hypervisor. */ -public interface VmInterferenceDomain : InterferenceDomain { +public interface VmInterferenceDomain { + /** + * Compute the performance score of a participant in this interference domain. + * + * @param key The participant to obtain the score of or `null` if the participant has no key. + * @param load The overall load on the interference domain. + * @return A score representing the performance score to be applied to the resource consumer, with 1 + * meaning no influence, <1 means that performance degrades, and >1 means that performance improves. + */ + public fun apply(key: VmInterferenceKey?, load: Double): Double + /** - * Construct an [InterferenceKey] for the specified [id]. + * Construct an [VmInterferenceKey] for the specified [id]. * * @param id The identifier of the virtual machine. * @return A key identifying the virtual machine as part of the interference domain. `null` if the virtual machine * does not participate in the domain. */ - public fun createKey(id: String): InterferenceKey? + public fun createKey(id: String): VmInterferenceKey? /** * Remove the specified [key] from this domain. */ - public fun removeKey(key: InterferenceKey) + public fun removeKey(key: VmInterferenceKey) /** * Mark the specified [key] as active in this interference domain. * * @param key The key to join the interference domain with. */ - public fun join(key: InterferenceKey) + public fun join(key: VmInterferenceKey) /** * Mark the specified [key] as inactive in this interference domain. * * @param key The key of the virtual machine that wants to leave the domain. */ - public fun leave(key: InterferenceKey) + public fun leave(key: VmInterferenceKey) } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceKey.kt index d28ebde5..8d720ea9 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceKey.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceKey.kt @@ -20,9 +20,9 @@ * SOFTWARE. */ -package org.opendc.simulator.flow.interference +package org.opendc.simulator.compute.kernel.interference /** * A key that uniquely identifies a participant of an interference domain. */ -public interface InterferenceKey +public interface VmInterferenceKey diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt index 977292be..7cc545c8 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/interference/VmInterferenceModel.kt @@ -22,7 +22,6 @@ package org.opendc.simulator.compute.kernel.interference -import org.opendc.simulator.flow.interference.InterferenceKey import java.util.* /** @@ -215,12 +214,12 @@ public class VmInterferenceModel private constructor( */ private val activeKeys = ArrayList<InterferenceKeyImpl>() - override fun createKey(id: String): InterferenceKey? { + override fun createKey(id: String): VmInterferenceKey? { val intId = idMapping[id] ?: return null return keys.computeIfAbsent(intId) { InterferenceKeyImpl(intId) } } - override fun removeKey(key: InterferenceKey) { + override fun removeKey(key: VmInterferenceKey) { if (key !is InterferenceKeyImpl) { return } @@ -232,7 +231,7 @@ public class VmInterferenceModel private constructor( keys.remove(key.id) } - override fun join(key: InterferenceKey) { + override fun join(key: VmInterferenceKey) { if (key !is InterferenceKeyImpl) { return } @@ -246,14 +245,14 @@ public class VmInterferenceModel private constructor( } } - override fun leave(key: InterferenceKey) { + override fun leave(key: VmInterferenceKey) { if (key is InterferenceKeyImpl && key.release()) { activeKeys.remove(key) computeActiveGroups(key.id) } } - override fun apply(key: InterferenceKey?, load: Double): Double { + override fun apply(key: VmInterferenceKey?, load: Double): Double { if (key == null || key !is InterferenceKeyImpl) { return 1.0 } @@ -367,7 +366,7 @@ public class VmInterferenceModel private constructor( * * @param id The identifier of the member. */ - private class InterferenceKeyImpl(@JvmField val id: Int) : InterferenceKey, Comparable<InterferenceKeyImpl> { + private class InterferenceKeyImpl(@JvmField val id: Int) : VmInterferenceKey, Comparable<InterferenceKeyImpl> { /** * The active groups to which the key belongs. */ diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt index 91855e8d..dd5bfc33 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisorTest.kt @@ -47,7 +47,6 @@ import org.opendc.simulator.flow.FlowEngine /** * Test suite for the [SimHypervisor] class. */ -@OptIn(ExperimentalCoroutinesApi::class) internal class SimFairShareHypervisorTest { private lateinit var model: MachineModel diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt index a717ae6e..d8ad7978 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowCounters.kt @@ -42,11 +42,6 @@ public interface FlowCounters { public val remaining: Double /** - * The accumulated flow lost due to interference between sources. - */ - public val interference: Double - - /** * Reset the flow counters. */ public fun reset() diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt index 0ad18f6a..6fa2971a 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowForwarder.kt @@ -258,6 +258,6 @@ public class FlowForwarder( val work = _demand * deltaS val actualWork = ctx.rate * deltaS - counters.increment(work, actualWork, (total - actualWork), 0.0) + counters.increment(work, actualWork, (total - actualWork)) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt index d0324ce8..ee8cd739 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/FlowSink.kt @@ -154,7 +154,7 @@ public class FlowSink( val work = capacity * deltaS val actualWork = ctx.rate * deltaS - counters.increment(work, actualWork, (total - actualWork), 0.0) + counters.increment(work, actualWork, (total - actualWork)) } } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt deleted file mode 100644 index aa2713b6..00000000 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/interference/InterferenceDomain.kt +++ /dev/null @@ -1,19 +0,0 @@ -package org.opendc.simulator.flow.interference - -import org.opendc.simulator.flow.FlowSource - -/** - * An interference domain represents a system of flow stages where [flow sources][FlowSource] may incur - * performance variability due to operating on the same resource and therefore causing interference. - */ -public interface InterferenceDomain { - /** - * Compute the performance score of a participant in this interference domain. - * - * @param key The participant to obtain the score of or `null` if the participant has no key. - * @param load The overall load on the interference domain. - * @return A score representing the performance score to be applied to the resource consumer, with 1 - * meaning no influence, <1 means that performance degrades, and >1 means that performance improves. - */ - public fun apply(key: InterferenceKey?, load: Double): Double -} diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt index d990dc61..c320a362 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/internal/MutableFlowCounters.kt @@ -34,23 +34,20 @@ public class MutableFlowCounters : FlowCounters { get() = _counters[1] override val remaining: Double get() = _counters[2] - override val interference: Double - get() = _counters[3] - private val _counters = DoubleArray(4) + private val _counters = DoubleArray(3) override fun reset() { _counters.fill(0.0) } - public fun increment(demand: Double, actual: Double, remaining: Double, interference: Double) { + public fun increment(demand: Double, actual: Double, remaining: Double) { val counters = _counters counters[0] += demand counters[1] += actual counters[2] += remaining - counters[3] += interference } override fun toString(): String { - return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining,interference=$interference]" + return "FlowCounters[demand=$demand,actual=$actual,remaining=$remaining]" } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt index 5f198944..3b4583e2 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/FlowMultiplexer.kt @@ -25,7 +25,6 @@ package org.opendc.simulator.flow.mux import org.opendc.simulator.flow.FlowConsumer import org.opendc.simulator.flow.FlowCounters import org.opendc.simulator.flow.FlowSource -import org.opendc.simulator.flow.interference.InterferenceKey /** * A [FlowMultiplexer] enables multiplexing multiple [FlowSource]s over possibly multiple [FlowConsumer]s. @@ -63,18 +62,15 @@ public interface FlowMultiplexer { /** * Create a new input on this multiplexer with a coupled capacity. - * - * @param key The key of the interference member to which the input belongs. */ - public fun newInput(key: InterferenceKey? = null): FlowConsumer + public fun newInput(): FlowConsumer /** * Create a new input on this multiplexer with the specified [capacity]. * * @param capacity The capacity of the input. - * @param key The key of the interference member to which the input belongs. */ - public fun newInput(capacity: Double, key: InterferenceKey? = null): FlowConsumer + public fun newInput(capacity: Double): FlowConsumer /** * Remove [input] from this multiplexer. diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt index 1d7d22ef..c18b2701 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/ForwardingFlowMultiplexer.kt @@ -23,7 +23,6 @@ package org.opendc.simulator.flow.mux import org.opendc.simulator.flow.* -import org.opendc.simulator.flow.interference.InterferenceKey import java.util.ArrayDeque /** @@ -54,8 +53,6 @@ public class ForwardingFlowMultiplexer( get() = _outputs.sumOf { it.forwarder.counters.actual } override val remaining: Double get() = _outputs.sumOf { it.forwarder.counters.remaining } - override val interference: Double - get() = _outputs.sumOf { it.forwarder.counters.interference } override fun reset() { for (output in _outputs) { @@ -75,14 +72,14 @@ public class ForwardingFlowMultiplexer( override val capacity: Double get() = _outputs.sumOf { it.forwarder.capacity } - override fun newInput(key: InterferenceKey?): FlowConsumer { + override fun newInput(): FlowConsumer { val output = checkNotNull(_availableOutputs.poll()) { "No capacity to serve request" } val input = Input(output) _inputs += input return input } - override fun newInput(capacity: Double, key: InterferenceKey?): FlowConsumer = newInput(key) + override fun newInput(capacity: Double): FlowConsumer = newInput() override fun removeInput(input: FlowConsumer) { if (!_inputs.remove(input)) { diff --git a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt index cc831862..7605f67d 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt +++ b/opendc-simulator/opendc-simulator-flow/src/main/kotlin/org/opendc/simulator/flow/mux/MaxMinFlowMultiplexer.kt @@ -23,11 +23,8 @@ package org.opendc.simulator.flow.mux import org.opendc.simulator.flow.* -import org.opendc.simulator.flow.interference.InterferenceDomain -import org.opendc.simulator.flow.interference.InterferenceKey import org.opendc.simulator.flow.internal.D_MS_TO_S import org.opendc.simulator.flow.internal.MutableFlowCounters -import kotlin.math.max import kotlin.math.min /** @@ -35,12 +32,10 @@ import kotlin.math.min * * @param engine The [FlowEngine] to drive the flow simulation. * @param parent The parent flow system of the multiplexer. - * @param interferenceDomain The interference domain of the multiplexer. */ public class MaxMinFlowMultiplexer( private val engine: FlowEngine, - parent: FlowConvergenceListener? = null, - private val interferenceDomain: InterferenceDomain? = null + parent: FlowConvergenceListener? = null ) : FlowMultiplexer { /** * The inputs of the multiplexer. @@ -85,16 +80,16 @@ public class MaxMinFlowMultiplexer( */ private val scheduler = Scheduler(engine, parent) - override fun newInput(key: InterferenceKey?): FlowConsumer { - return newInput(isCoupled = true, Double.POSITIVE_INFINITY, key) + override fun newInput(): FlowConsumer { + return newInput(isCoupled = true, Double.POSITIVE_INFINITY) } - override fun newInput(capacity: Double, key: InterferenceKey?): FlowConsumer { - return newInput(isCoupled = false, capacity, key) + override fun newInput(capacity: Double): FlowConsumer { + return newInput(isCoupled = false, capacity) } - private fun newInput(isCoupled: Boolean, initialCapacity: Double, key: InterferenceKey?): FlowConsumer { - val provider = Input(engine, scheduler, interferenceDomain, key, isCoupled, initialCapacity) + private fun newInput(isCoupled: Boolean, initialCapacity: Double): FlowConsumer { + val provider = Input(engine, scheduler, isCoupled, initialCapacity) _inputs.add(provider) return provider } @@ -499,8 +494,7 @@ public class MaxMinFlowMultiplexer( counters.increment( demand = demand * deltaS, actual = rate * deltaS, - remaining = (previousCapacity - rate) * deltaS, - interference = 0.0 + remaining = (previousCapacity - rate) * deltaS ) } } @@ -511,8 +505,6 @@ public class MaxMinFlowMultiplexer( private class Input( private val engine: FlowEngine, private val scheduler: Scheduler, - private val interferenceDomain: InterferenceDomain?, - @JvmField val key: InterferenceKey?, @JvmField val isCoupled: Boolean, initialCapacity: Double, ) : FlowConsumer, FlowConsumerLogic, Comparable<Input> { @@ -693,24 +685,15 @@ public class MaxMinFlowMultiplexer( return } - // Compute the performance penalty due to flow interference - val perfScore = if (interferenceDomain != null) { - val load = scheduler.rate / scheduler.capacity - interferenceDomain.apply(key, load) - } else { - 1.0 - } - val actualRate = actualRate val deltaS = delta * D_MS_TO_S val demand = limit * deltaS val actual = actualRate * deltaS val remaining = (_capacity - actualRate) * deltaS - val interference = actual * max(0.0, 1 - perfScore) - _counters.increment(demand, actual, remaining, interference) - scheduler.counters.increment(0.0, 0.0, 0.0, interference) + _counters.increment(demand, actual, remaining) + scheduler.counters.increment(0.0, 0.0, 0.0) } } |
