From 4e9f72b50473d73f9ca9e30a7fbeb97a8a1c0555 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 7 Apr 2021 16:26:00 +0200 Subject: simulator: Move away from StateFlow for low-level monitoring This change removes the StateFlow speed property on the SimResourceSource, as the overhead of emitting changes to the StateFlow is too high in a single-thread context. Our new approach is to use direct callbacks and counters. --- .../opendc/simulator/compute/SimAbstractMachine.kt | 38 ++++++++++++---------- .../simulator/compute/SimBareMetalMachine.kt | 15 +++++---- .../compute/cpufreq/DemandScalingGovernor.kt | 2 +- .../compute/cpufreq/PStateScalingDriver.kt | 2 +- .../compute/cpufreq/DemandScalingGovernorTest.kt | 2 +- .../compute/cpufreq/PStateScalingDriverTest.kt | 8 ++--- 6 files changed, 36 insertions(+), 31 deletions(-) (limited to 'simulator/opendc-simulator/opendc-simulator-compute') diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt index 2127b066..1f26c9c9 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt @@ -25,14 +25,13 @@ package org.opendc.simulator.compute import kotlinx.coroutines.* import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.resources.SimResourceProvider import org.opendc.simulator.resources.SimResourceSource import org.opendc.simulator.resources.consume +import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter import java.time.Clock import kotlin.coroutines.CoroutineContext @@ -47,9 +46,9 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine /** * The speed of the CPU cores. */ - public val speed: List + public val speed: DoubleArray get() = _speed - private var _speed = mutableListOf() + private var _speed = doubleArrayOf() /** * A flag to indicate that the machine is terminated. @@ -94,29 +93,32 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine val ctx = Context(resources, meta) val totalCapacity = model.cpus.sumByDouble { it.frequency } - _speed = MutableList(model.cpus.size) { 0.0 } + _speed = DoubleArray(model.cpus.size) { 0.0 } + var totalSpeed = 0.0 workload.onStart(ctx) for ((cpu, source) in resources) { val consumer = workload.getConsumer(ctx, cpu) - val job = source.speed - .onEach { - _speed[cpu.id] = it - _usage.value = _speed.sum() / totalCapacity - } - .launchIn(this) - - launch { - try { - source.consume(consumer) - } finally { - job.cancel() - } + val adapter = SimSpeedConsumerAdapter(consumer) { newSpeed -> + val oldSpeed = _speed[cpu.id] + _speed[cpu.id] = newSpeed + totalSpeed = totalSpeed - oldSpeed + newSpeed + + updateUsage(totalSpeed / totalCapacity) } + + launch { source.consume(adapter) } } } + /** + * This method is invoked when the usage of the machine is updated. + */ + protected open fun updateUsage(usage: Double) { + _usage.value = usage + } + override fun close() { if (!isTerminated) { isTerminated = true diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt index 51b807d2..d5577279 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt @@ -84,12 +84,15 @@ public class SimBareMetalMachine( /** * The power draw of the machine. */ - public val powerDraw: StateFlow = usage - .map { - this.scalingGovernors.forEach { it.onLimit() } - this.scalingDriver.computePower() - } - .stateIn(scope, SharingStarted.Eagerly, 0.0) + public var powerDraw: Double = 0.0 + private set + + override fun updateUsage(usage: Double) { + super.updateUsage(usage) + + scalingGovernors.forEach { it.onLimit() } + powerDraw = scalingDriver.computePower() + } override fun close() { super.close() diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt index b4bbf9fb..4d62c383 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt @@ -28,7 +28,7 @@ package org.opendc.simulator.compute.cpufreq public class DemandScalingGovernor : ScalingGovernor { override fun createLogic(ctx: ScalingContext): ScalingGovernor.Logic = object : ScalingGovernor.Logic { override fun onLimit() { - ctx.setTarget(ctx.resource.speed.value) + ctx.setTarget(ctx.resource.speed) } override fun toString(): String = "DemandScalingGovernor.Logic" diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt index d109e4d8..1c82253c 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt @@ -59,7 +59,7 @@ public class PStateScalingDriver(states: Map) : ScalingDrive for (ctx in contexts) { targetFreq = max(ctx.target, targetFreq) - totalSpeed += ctx.resource.speed.value + totalSpeed += ctx.resource.speed } val maxFreq = states.lastKey() diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt index 19c06126..c02b6285 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt @@ -35,7 +35,7 @@ internal class DemandScalingGovernorTest { fun testSetDemandLimit() { val ctx = mockk(relaxUnitFun = true) - every { ctx.resource.speed.value } returns 2100.0 + every { ctx.resource.speed } returns 2100.0 val logic = DemandScalingGovernor().createLogic(ctx) diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt index 5c30bc1f..c6f233a6 100644 --- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt +++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt @@ -59,7 +59,7 @@ internal class PStateScalingDriverTest { val resource = mockk() every { cpu.frequency } returns 4100.0 - every { resource.speed.value } returns 1200.0 + every { resource.speed } returns 1200.0 val driver = PStateScalingDriver( sortedMapOf( @@ -84,7 +84,7 @@ internal class PStateScalingDriverTest { val resource = mockk() every { cpu.frequency } returns 4100.0 - every { resource.speed.value } returns 1200.0 + every { resource.speed } returns 1200.0 val driver = PStateScalingDriver( sortedMapOf( @@ -125,11 +125,11 @@ internal class PStateScalingDriverTest { val scalingContext = logic.createContext(cpu, resource) - every { resource.speed.value } returns 1400.0 + every { resource.speed } returns 1400.0 scalingContext.setTarget(1400.0) assertEquals(150.0, logic.computePower()) - every { resource.speed.value } returns 1400.0 + every { resource.speed } returns 1400.0 scalingContext.setTarget(4000.0) assertEquals(235.0, logic.computePower()) } -- cgit v1.2.3