diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-04-07 16:26:00 +0200 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-04-07 20:38:46 +0200 |
| commit | 4e9f72b50473d73f9ca9e30a7fbeb97a8a1c0555 (patch) | |
| tree | 6d4855c6a93cfc41064e73b169e2f39d5530a5ae /simulator/opendc-simulator/opendc-simulator-compute/src/main | |
| parent | 95a0ed6911f136fb25bb76d6b6e010bf66b8ba5b (diff) | |
simulator: Move away from StateFlow for low-level monitoring
This change removes the StateFlow speed property on the
SimResourceSource, as the overhead of emitting changes to the StateFlow
is too high in a single-thread context. Our new approach is to use
direct callbacks and counters.
Diffstat (limited to 'simulator/opendc-simulator/opendc-simulator-compute/src/main')
4 files changed, 31 insertions, 26 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 2127b066..1f26c9c9 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -25,14 +25,13 @@ package org.opendc.simulator.compute import kotlinx.coroutines.* import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.SimResourceProvider import org.opendc.simulator.resources.SimResourceSource import org.opendc.simulator.resources.consume +import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import java.time.Clock import kotlin.coroutines.CoroutineContext @@ -47,9 +46,9 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine /** * The speed of the CPU cores. */ - public val speed: List<Double> + public val speed: DoubleArray get() = _speed - private var _speed = mutableListOf<Double>() + private var _speed = doubleArrayOf() /** * A flag to indicate that the machine is terminated. @@ -94,29 +93,32 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine val ctx = Context(resources, meta) val totalCapacity = model.cpus.sumByDouble { it.frequency } - _speed = MutableList(model.cpus.size) { 0.0 } + _speed = DoubleArray(model.cpus.size) { 0.0 } + var totalSpeed = 0.0 workload.onStart(ctx) for ((cpu, source) in resources) { val consumer = workload.getConsumer(ctx, cpu) - val job = source.speed - .onEach { - _speed[cpu.id] = it - _usage.value = _speed.sum() / totalCapacity - } - .launchIn(this) - - launch { - try { - source.consume(consumer) - } finally { - job.cancel() - } + val adapter = SimSpeedConsumerAdapter(consumer) { newSpeed -> + val oldSpeed = _speed[cpu.id] + _speed[cpu.id] = newSpeed + totalSpeed = totalSpeed - oldSpeed + newSpeed + + updateUsage(totalSpeed / totalCapacity) } + + launch { source.consume(adapter) } } } + /** + * This method is invoked when the usage of the machine is updated. + */ + protected open fun updateUsage(usage: Double) { + _usage.value = usage + } + override fun close() { if (!isTerminated) { isTerminated = true diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 51b807d2..d5577279 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -84,12 +84,15 @@ public class SimBareMetalMachine( /** * The power draw of the machine. */ - public val powerDraw: StateFlow<Double> = usage - .map { - this.scalingGovernors.forEach { it.onLimit() } - this.scalingDriver.computePower() - } - .stateIn(scope, SharingStarted.Eagerly, 0.0) + public var powerDraw: Double = 0.0 + private set + + override fun updateUsage(usage: Double) { + super.updateUsage(usage) + + scalingGovernors.forEach { it.onLimit() } + powerDraw = scalingDriver.computePower() + } override fun close() { super.close() diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt index b4bbf9fb..4d62c383 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt @@ -28,7 +28,7 @@ package org.opendc.simulator.compute.cpufreq public class DemandScalingGovernor : ScalingGovernor { override fun createLogic(ctx: ScalingContext): ScalingGovernor.Logic = object : ScalingGovernor.Logic { override fun onLimit() { - ctx.setTarget(ctx.resource.speed.value) + ctx.setTarget(ctx.resource.speed) } override fun toString(): String = "DemandScalingGovernor.Logic" diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt index d109e4d8..1c82253c 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt @@ -59,7 +59,7 @@ public class PStateScalingDriver(states: Map<Double, PowerModel>) : ScalingDrive for (ctx in contexts) { targetFreq = max(ctx.target, targetFreq) - totalSpeed += ctx.resource.speed.value + totalSpeed += ctx.resource.speed } val maxFreq = states.lastKey() |
