summaryrefslogtreecommitdiff
path: root/simulator/opendc-compute/opendc-compute-simulator
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-27 12:33:13 +0100
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-03-27 12:36:18 +0100
commit5b0eaf76ec00192c755b268b7655f6463c5bc62f (patch)
treecf7c04ce21d0da964172ef35deb45c843aea8b98 /simulator/opendc-compute/opendc-compute-simulator
parentccd1f96f8568978c80aa0b9a100ca6158ade34ba (diff)
compute: Migrate compute service simulator to OpenTelemetry
This change updates the compute service simulator to use OpenTelemetry for reporting metrics of the (simulated) hosts as opposed to using custom event flows. This approach is more generic, flexible and possibly offers better performance as we can collect metrics of all services in a single sweep, as opposed to listening to several services and each invoking the handlers.
Diffstat (limited to 'simulator/opendc-compute/opendc-compute-simulator')
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts1
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt117
-rw-r--r--simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt142
3 files changed, 180 insertions, 80 deletions
diff --git a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts
index 1ad3f1c6..3bf8a114 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts
+++ b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts
@@ -38,5 +38,6 @@ dependencies {
implementation("io.github.microutils:kotlin-logging")
testImplementation(project(":opendc-simulator:opendc-simulator-core"))
+ testImplementation(project(":opendc-telemetry:opendc-telemetry-sdk"))
testRuntimeOnly("org.slf4j:slf4j-simple:${versions.slf4j}")
}
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
index 89784803..6d81aa7d 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt
@@ -22,8 +22,9 @@
package org.opendc.compute.simulator
+import io.opentelemetry.api.metrics.Meter
+import io.opentelemetry.api.metrics.common.Labels
import kotlinx.coroutines.*
-import kotlinx.coroutines.flow.Flow
import mu.KotlinLogging
import org.opendc.compute.api.Flavor
import org.opendc.compute.api.Server
@@ -36,7 +37,6 @@ import org.opendc.simulator.compute.model.MemoryUnit
import org.opendc.simulator.compute.power.ConstantPowerModel
import org.opendc.simulator.compute.power.MachinePowerModel
import org.opendc.simulator.failures.FailureDomain
-import org.opendc.utils.flow.EventFlow
import java.time.Clock
import java.util.*
import kotlin.coroutines.CoroutineContext
@@ -52,6 +52,7 @@ public class SimHost(
override val meta: Map<String, Any>,
context: CoroutineContext,
clock: Clock,
+ meter: Meter,
hypervisor: SimHypervisorProvider,
powerModel: MachinePowerModel = ConstantPowerModel(0.0),
private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(),
@@ -66,10 +67,6 @@ public class SimHost(
*/
private val logger = KotlinLogging.logger {}
- override val events: Flow<HostEvent>
- get() = _events
- internal val _events = EventFlow<HostEvent>()
-
/**
* The event listeners registered with this host.
*/
@@ -99,18 +96,13 @@ public class SimHost(
cpuUsage: Double,
cpuDemand: Double
) {
- _events.emit(
- HostEvent.SliceFinished(
- this@SimHost,
- requestedWork,
- grantedWork,
- overcommittedWork,
- interferedWork,
- cpuUsage,
- cpuDemand,
- guests.size
- )
- )
+ _cpuWork.record(requestedWork.toDouble())
+ _cpuWorkGranted.record(grantedWork.toDouble())
+ _cpuWorkOvercommit.record(overcommittedWork.toDouble())
+ _cpuWorkInterference.record(interferedWork.toDouble())
+ _cpuUsage.record(cpuUsage)
+ _cpuDemand.record(cpuDemand)
+ _cpuPower.record(machine.powerDraw.value)
}
}
)
@@ -132,6 +124,87 @@ public class SimHost(
override val model: HostModel = HostModel(model.cpus.size, model.memory.map { it.size }.sum())
+ /**
+ * The number of guests on the host.
+ */
+ private val _guests = meter.longUpDownCounterBuilder("guests.total")
+ .setDescription("Number of guests")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The number of active guests on the host.
+ */
+ private val _activeGuests = meter.longUpDownCounterBuilder("guests.active")
+ .setDescription("Number of active guests")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The CPU usage on the host.
+ */
+ private val _cpuUsage = meter.doubleValueRecorderBuilder("cpu.usage")
+ .setDescription("The amount of CPU resources used by the host")
+ .setUnit("MHz")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The CPU demand on the host.
+ */
+ private val _cpuDemand = meter.doubleValueRecorderBuilder("cpu.demand")
+ .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits")
+ .setUnit("MHz")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The requested work for the CPU.
+ */
+ private val _cpuPower = meter.doubleValueRecorderBuilder("power.usage")
+ .setDescription("The amount of power used by the CPU")
+ .setUnit("W")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The requested work for the CPU.
+ */
+ private val _cpuWork = meter.doubleValueRecorderBuilder("cpu.work.total")
+ .setDescription("The amount of work supplied to the CPU")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The work actually performed by the CPU.
+ */
+ private val _cpuWorkGranted = meter.doubleValueRecorderBuilder("cpu.work.granted")
+ .setDescription("The amount of work performed by the CPU")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The work that could not be performed by the CPU due to overcommitting resource.
+ */
+ private val _cpuWorkOvercommit = meter.doubleValueRecorderBuilder("cpu.work.overcommit")
+ .setDescription("The amount of work not performed by the CPU due to overcommitment")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
+ /**
+ * The work that could not be performed by the CPU due to interference.
+ */
+ private val _cpuWorkInterference = meter.doubleValueRecorderBuilder("cpu.work.interference")
+ .setDescription("The amount of work not performed by the CPU due to interference")
+ .setUnit("1")
+ .build()
+ .bind(Labels.of("host", uid.toString()))
+
init {
// Launch hypervisor onto machine
scope.launch {
@@ -166,12 +239,11 @@ public class SimHost(
require(canFit(server)) { "Server does not fit" }
val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel()))
guests[server] = guest
+ _guests.add(1)
if (start) {
guest.start()
}
-
- _events.emit(HostEvent.VmsUpdated(this, guests.count { it.value.state == ServerState.RUNNING }, availableMemory))
}
override fun contains(server: Server): Boolean {
@@ -191,6 +263,7 @@ public class SimHost(
override suspend fun delete(server: Server) {
val guest = guests.remove(server) ?: return
guest.terminate()
+ _guests.add(-1)
}
override fun addListener(listener: HostListener) {
@@ -228,6 +301,7 @@ public class SimHost(
}
}
+ _activeGuests.add(1)
listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
}
@@ -238,9 +312,8 @@ public class SimHost(
}
}
+ _activeGuests.add(-1)
listeners.forEach { it.onStateChanged(this, vm.server, vm.state) }
-
- _events.emit(HostEvent.VmsUpdated(this@SimHost, guests.count { it.value.state == ServerState.RUNNING }, availableMemory))
}
override suspend fun fail() {
diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
index e311cd21..830fc868 100644
--- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
+++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt
@@ -22,12 +22,14 @@
package org.opendc.compute.simulator
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.flow.launchIn
-import kotlinx.coroutines.flow.onEach
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.test.TestCoroutineScope
+import io.opentelemetry.api.metrics.MeterProvider
+import io.opentelemetry.sdk.common.CompletableResultCode
+import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.data.MetricData
+import io.opentelemetry.sdk.metrics.export.MetricExporter
+import io.opentelemetry.sdk.metrics.export.MetricProducer
+import kotlinx.coroutines.*
+import kotlinx.coroutines.test.runBlockingTest
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
@@ -37,7 +39,8 @@ import org.opendc.compute.api.Image
import org.opendc.compute.api.Server
import org.opendc.compute.api.ServerState
import org.opendc.compute.api.ServerWatcher
-import org.opendc.compute.service.driver.HostEvent
+import org.opendc.compute.service.driver.Host
+import org.opendc.compute.service.driver.HostListener
import org.opendc.simulator.compute.SimFairShareHypervisorProvider
import org.opendc.simulator.compute.SimMachineModel
import org.opendc.simulator.compute.model.MemoryUnit
@@ -45,23 +48,20 @@ import org.opendc.simulator.compute.model.ProcessingNode
import org.opendc.simulator.compute.model.ProcessingUnit
import org.opendc.simulator.compute.workload.SimTraceWorkload
import org.opendc.simulator.utils.DelayControllerClockAdapter
-import java.time.Clock
+import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader
+import org.opendc.telemetry.sdk.toOtelClock
import java.util.UUID
+import kotlin.coroutines.resume
/**
* Basic test-suite for the hypervisor.
*/
@OptIn(ExperimentalCoroutinesApi::class)
internal class SimHostTest {
- private lateinit var scope: TestCoroutineScope
- private lateinit var clock: Clock
private lateinit var machineModel: SimMachineModel
@BeforeEach
fun setUp() {
- scope = TestCoroutineScope()
- clock = DelayControllerClockAdapter(scope)
-
val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2)
machineModel = SimMachineModel(
@@ -74,72 +74,98 @@ internal class SimHostTest {
* Test overcommitting of resources by the hypervisor.
*/
@Test
- fun testOvercommitted() {
+ fun testOvercommitted() = runBlockingTest {
+ val clock = DelayControllerClockAdapter(this)
var requestedWork = 0L
var grantedWork = 0L
var overcommittedWork = 0L
- scope.launch {
- val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, SimFairShareHypervisorProvider())
- val duration = 5 * 60L
- val vmImageA = MockImage(
- UUID.randomUUID(),
- "<unnamed>",
- emptyMap(),
- mapOf(
- "workload" to SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 28.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 3500.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 183.0, 2)
- ),
- )
+ val meterProvider: MeterProvider = SdkMeterProvider
+ .builder()
+ .setClock(clock.toOtelClock())
+ .build()
+
+ val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider())
+ val duration = 5 * 60L
+ val vmImageA = MockImage(
+ UUID.randomUUID(),
+ "<unnamed>",
+ emptyMap(),
+ mapOf(
+ "workload" to SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(duration * 1000, 28.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 3500.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 183.0, 2)
+ ),
)
)
- val vmImageB = MockImage(
- UUID.randomUUID(),
- "<unnamed>",
- emptyMap(),
- mapOf(
- "workload" to SimTraceWorkload(
- sequenceOf(
- SimTraceWorkload.Fragment(duration * 1000, 28.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 3100.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
- SimTraceWorkload.Fragment(duration * 1000, 73.0, 2)
- )
+ )
+ val vmImageB = MockImage(
+ UUID.randomUUID(),
+ "<unnamed>",
+ emptyMap(),
+ mapOf(
+ "workload" to SimTraceWorkload(
+ sequenceOf(
+ SimTraceWorkload.Fragment(duration * 1000, 28.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 3100.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 0.0, 2),
+ SimTraceWorkload.Fragment(duration * 1000, 73.0, 2)
)
)
)
+ )
- delay(5)
-
- val flavor = MockFlavor(2, 0)
- virtDriver.events
- .onEach { event ->
- when (event) {
- is HostEvent.SliceFinished -> {
- requestedWork += event.requestedBurst
- grantedWork += event.grantedBurst
- overcommittedWork += event.overcommissionedBurst
- }
- }
+ val flavor = MockFlavor(2, 0)
+
+ // Setup metric reader
+ val reader = CoroutineMetricReader(
+ this, listOf(meterProvider as MetricProducer),
+ object : MetricExporter {
+ override fun export(metrics: Collection<MetricData>): CompletableResultCode {
+ val metricsByName = metrics.associateBy { it.name }
+ requestedWork += metricsByName.getValue("cpu.work.total").doubleSummaryData.points.first().sum.toLong()
+ grantedWork += metricsByName.getValue("cpu.work.granted").doubleSummaryData.points.first().sum.toLong()
+ overcommittedWork += metricsByName.getValue("cpu.work.overcommit").doubleSummaryData.points.first().sum.toLong()
+ return CompletableResultCode.ofSuccess()
}
- .launchIn(this)
+ override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
+
+ override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
+ },
+ exportInterval = duration * 1000
+ )
+
+ coroutineScope {
launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) }
launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) }
+
+ suspendCancellableCoroutine<Unit> { cont ->
+ virtDriver.addListener(object : HostListener {
+ private var finished = 0
+
+ override fun onStateChanged(host: Host, server: Server, newState: ServerState) {
+ if (newState == ServerState.TERMINATED && ++finished == 2) {
+ cont.resume(Unit)
+ }
+ }
+ })
+ }
}
- scope.advanceUntilIdle()
+ // Ensure last cycle is collected
+ delay(1000 * duration)
+ virtDriver.close()
+ reader.close()
assertAll(
- { assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") },
{ assertEquals(4197600, requestedWork, "Requested work does not match") },
{ assertEquals(2157600, grantedWork, "Granted work does not match") },
{ assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") },
- { assertEquals(1200006, scope.currentTime) }
+ { assertEquals(1500001, currentTime) }
)
}