summaryrefslogtreecommitdiff
path: root/simulator/opendc-simulator/opendc-simulator-compute
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-04-07 16:26:00 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-04-07 20:38:46 +0200
commit4e9f72b50473d73f9ca9e30a7fbeb97a8a1c0555 (patch)
tree6d4855c6a93cfc41064e73b169e2f39d5530a5ae /simulator/opendc-simulator/opendc-simulator-compute
parent95a0ed6911f136fb25bb76d6b6e010bf66b8ba5b (diff)
simulator: Move away from StateFlow for low-level monitoring
This change removes the StateFlow speed property on the SimResourceSource, as the overhead of emitting changes to the StateFlow is too high in a single-thread context. Our new approach is to use direct callbacks and counters.
Diffstat (limited to 'simulator/opendc-simulator/opendc-simulator-compute')
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt38
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt15
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt2
-rw-r--r--simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt8
6 files changed, 36 insertions, 31 deletions
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
index 2127b066..1f26c9c9 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimAbstractMachine.kt
@@ -25,14 +25,13 @@ package org.opendc.simulator.compute
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
-import kotlinx.coroutines.flow.launchIn
-import kotlinx.coroutines.flow.onEach
import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimWorkload
import org.opendc.simulator.resources.SimResourceProvider
import org.opendc.simulator.resources.SimResourceSource
import org.opendc.simulator.resources.consume
+import org.opendc.simulator.resources.consumer.SimSpeedConsumerAdapter
import java.time.Clock
import kotlin.coroutines.CoroutineContext
@@ -47,9 +46,9 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine
/**
* The speed of the CPU cores.
*/
- public val speed: List<Double>
+ public val speed: DoubleArray
get() = _speed
- private var _speed = mutableListOf<Double>()
+ private var _speed = doubleArrayOf()
/**
* A flag to indicate that the machine is terminated.
@@ -94,29 +93,32 @@ public abstract class SimAbstractMachine(private val clock: Clock) : SimMachine
val ctx = Context(resources, meta)
val totalCapacity = model.cpus.sumByDouble { it.frequency }
- _speed = MutableList(model.cpus.size) { 0.0 }
+ _speed = DoubleArray(model.cpus.size) { 0.0 }
+ var totalSpeed = 0.0
workload.onStart(ctx)
for ((cpu, source) in resources) {
val consumer = workload.getConsumer(ctx, cpu)
- val job = source.speed
- .onEach {
- _speed[cpu.id] = it
- _usage.value = _speed.sum() / totalCapacity
- }
- .launchIn(this)
-
- launch {
- try {
- source.consume(consumer)
- } finally {
- job.cancel()
- }
+ val adapter = SimSpeedConsumerAdapter(consumer) { newSpeed ->
+ val oldSpeed = _speed[cpu.id]
+ _speed[cpu.id] = newSpeed
+ totalSpeed = totalSpeed - oldSpeed + newSpeed
+
+ updateUsage(totalSpeed / totalCapacity)
}
+
+ launch { source.consume(adapter) }
}
}
+ /**
+ * This method is invoked when the usage of the machine is updated.
+ */
+ protected open fun updateUsage(usage: Double) {
+ _usage.value = usage
+ }
+
override fun close() {
if (!isTerminated) {
isTerminated = true
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
index 51b807d2..d5577279 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/SimBareMetalMachine.kt
@@ -84,12 +84,15 @@ public class SimBareMetalMachine(
/**
* The power draw of the machine.
*/
- public val powerDraw: StateFlow<Double> = usage
- .map {
- this.scalingGovernors.forEach { it.onLimit() }
- this.scalingDriver.computePower()
- }
- .stateIn(scope, SharingStarted.Eagerly, 0.0)
+ public var powerDraw: Double = 0.0
+ private set
+
+ override fun updateUsage(usage: Double) {
+ super.updateUsage(usage)
+
+ scalingGovernors.forEach { it.onLimit() }
+ powerDraw = scalingDriver.computePower()
+ }
override fun close() {
super.close()
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt
index b4bbf9fb..4d62c383 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernor.kt
@@ -28,7 +28,7 @@ package org.opendc.simulator.compute.cpufreq
public class DemandScalingGovernor : ScalingGovernor {
override fun createLogic(ctx: ScalingContext): ScalingGovernor.Logic = object : ScalingGovernor.Logic {
override fun onLimit() {
- ctx.setTarget(ctx.resource.speed.value)
+ ctx.setTarget(ctx.resource.speed)
}
override fun toString(): String = "DemandScalingGovernor.Logic"
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt
index d109e4d8..1c82253c 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriver.kt
@@ -59,7 +59,7 @@ public class PStateScalingDriver(states: Map<Double, PowerModel>) : ScalingDrive
for (ctx in contexts) {
targetFreq = max(ctx.target, targetFreq)
- totalSpeed += ctx.resource.speed.value
+ totalSpeed += ctx.resource.speed
}
val maxFreq = states.lastKey()
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt
index 19c06126..c02b6285 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/DemandScalingGovernorTest.kt
@@ -35,7 +35,7 @@ internal class DemandScalingGovernorTest {
fun testSetDemandLimit() {
val ctx = mockk<ScalingContext>(relaxUnitFun = true)
- every { ctx.resource.speed.value } returns 2100.0
+ every { ctx.resource.speed } returns 2100.0
val logic = DemandScalingGovernor().createLogic(ctx)
diff --git a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt
index 5c30bc1f..c6f233a6 100644
--- a/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt
+++ b/simulator/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/cpufreq/PStateScalingDriverTest.kt
@@ -59,7 +59,7 @@ internal class PStateScalingDriverTest {
val resource = mockk<SimResourceSource>()
every { cpu.frequency } returns 4100.0
- every { resource.speed.value } returns 1200.0
+ every { resource.speed } returns 1200.0
val driver = PStateScalingDriver(
sortedMapOf(
@@ -84,7 +84,7 @@ internal class PStateScalingDriverTest {
val resource = mockk<SimResourceSource>()
every { cpu.frequency } returns 4100.0
- every { resource.speed.value } returns 1200.0
+ every { resource.speed } returns 1200.0
val driver = PStateScalingDriver(
sortedMapOf(
@@ -125,11 +125,11 @@ internal class PStateScalingDriverTest {
val scalingContext = logic.createContext(cpu, resource)
- every { resource.speed.value } returns 1400.0
+ every { resource.speed } returns 1400.0
scalingContext.setTarget(1400.0)
assertEquals(150.0, logic.computePower())
- every { resource.speed.value } returns 1400.0
+ every { resource.speed } returns 1400.0
scalingContext.setTarget(4000.0)
assertEquals(235.0, logic.computePower())
}