diff options
Diffstat (limited to 'simulator/opendc-compute/opendc-compute-simulator')
3 files changed, 180 insertions, 80 deletions
diff --git a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts index 1ad3f1c6..3bf8a114 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts +++ b/simulator/opendc-compute/opendc-compute-simulator/build.gradle.kts @@ -38,5 +38,6 @@ dependencies { implementation("io.github.microutils:kotlin-logging") testImplementation(project(":opendc-simulator:opendc-simulator-core")) + testImplementation(project(":opendc-telemetry:opendc-telemetry-sdk")) testRuntimeOnly("org.slf4j:slf4j-simple:${versions.slf4j}") } diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 89784803..6d81aa7d 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -22,8 +22,9 @@ package org.opendc.compute.simulator +import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.common.Labels import kotlinx.coroutines.* -import kotlinx.coroutines.flow.Flow import mu.KotlinLogging import org.opendc.compute.api.Flavor import org.opendc.compute.api.Server @@ -36,7 +37,6 @@ import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.power.ConstantPowerModel import org.opendc.simulator.compute.power.MachinePowerModel import org.opendc.simulator.failures.FailureDomain -import org.opendc.utils.flow.EventFlow import java.time.Clock import java.util.* import kotlin.coroutines.CoroutineContext @@ -52,6 +52,7 @@ public class SimHost( override val meta: Map<String, Any>, context: CoroutineContext, clock: Clock, + meter: Meter, hypervisor: SimHypervisorProvider, powerModel: MachinePowerModel = ConstantPowerModel(0.0), private val mapper: SimWorkloadMapper = SimMetaWorkloadMapper(), @@ -66,10 +67,6 @@ public class SimHost( */ private val logger = KotlinLogging.logger {} - override val events: Flow<HostEvent> - get() = _events - internal val _events = EventFlow<HostEvent>() - /** * The event listeners registered with this host. */ @@ -99,18 +96,13 @@ public class SimHost( cpuUsage: Double, cpuDemand: Double ) { - _events.emit( - HostEvent.SliceFinished( - this@SimHost, - requestedWork, - grantedWork, - overcommittedWork, - interferedWork, - cpuUsage, - cpuDemand, - guests.size - ) - ) + _cpuWork.record(requestedWork.toDouble()) + _cpuWorkGranted.record(grantedWork.toDouble()) + _cpuWorkOvercommit.record(overcommittedWork.toDouble()) + _cpuWorkInterference.record(interferedWork.toDouble()) + _cpuUsage.record(cpuUsage) + _cpuDemand.record(cpuDemand) + _cpuPower.record(machine.powerDraw.value) } } ) @@ -132,6 +124,87 @@ public class SimHost( override val model: HostModel = HostModel(model.cpus.size, model.memory.map { it.size }.sum()) + /** + * The number of guests on the host. + */ + private val _guests = meter.longUpDownCounterBuilder("guests.total") + .setDescription("Number of guests") + .setUnit("1") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The number of active guests on the host. + */ + private val _activeGuests = meter.longUpDownCounterBuilder("guests.active") + .setDescription("Number of active guests") + .setUnit("1") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The CPU usage on the host. + */ + private val _cpuUsage = meter.doubleValueRecorderBuilder("cpu.usage") + .setDescription("The amount of CPU resources used by the host") + .setUnit("MHz") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The CPU demand on the host. + */ + private val _cpuDemand = meter.doubleValueRecorderBuilder("cpu.demand") + .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits") + .setUnit("MHz") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The requested work for the CPU. + */ + private val _cpuPower = meter.doubleValueRecorderBuilder("power.usage") + .setDescription("The amount of power used by the CPU") + .setUnit("W") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The requested work for the CPU. + */ + private val _cpuWork = meter.doubleValueRecorderBuilder("cpu.work.total") + .setDescription("The amount of work supplied to the CPU") + .setUnit("1") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The work actually performed by the CPU. + */ + private val _cpuWorkGranted = meter.doubleValueRecorderBuilder("cpu.work.granted") + .setDescription("The amount of work performed by the CPU") + .setUnit("1") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The work that could not be performed by the CPU due to overcommitting resource. + */ + private val _cpuWorkOvercommit = meter.doubleValueRecorderBuilder("cpu.work.overcommit") + .setDescription("The amount of work not performed by the CPU due to overcommitment") + .setUnit("1") + .build() + .bind(Labels.of("host", uid.toString())) + + /** + * The work that could not be performed by the CPU due to interference. + */ + private val _cpuWorkInterference = meter.doubleValueRecorderBuilder("cpu.work.interference") + .setDescription("The amount of work not performed by the CPU due to interference") + .setUnit("1") + .build() + .bind(Labels.of("host", uid.toString())) + init { // Launch hypervisor onto machine scope.launch { @@ -166,12 +239,11 @@ public class SimHost( require(canFit(server)) { "Server does not fit" } val guest = Guest(server, hypervisor.createMachine(server.flavor.toMachineModel())) guests[server] = guest + _guests.add(1) if (start) { guest.start() } - - _events.emit(HostEvent.VmsUpdated(this, guests.count { it.value.state == ServerState.RUNNING }, availableMemory)) } override fun contains(server: Server): Boolean { @@ -191,6 +263,7 @@ public class SimHost( override suspend fun delete(server: Server) { val guest = guests.remove(server) ?: return guest.terminate() + _guests.add(-1) } override fun addListener(listener: HostListener) { @@ -228,6 +301,7 @@ public class SimHost( } } + _activeGuests.add(1) listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } } @@ -238,9 +312,8 @@ public class SimHost( } } + _activeGuests.add(-1) listeners.forEach { it.onStateChanged(this, vm.server, vm.state) } - - _events.emit(HostEvent.VmsUpdated(this@SimHost, guests.count { it.value.state == ServerState.RUNNING }, availableMemory)) } override suspend fun fail() { diff --git a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index e311cd21..830fc868 100644 --- a/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/simulator/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -22,12 +22,14 @@ package org.opendc.compute.simulator -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.common.CompletableResultCode +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.export.MetricExporter +import io.opentelemetry.sdk.metrics.export.MetricProducer +import kotlinx.coroutines.* +import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test @@ -37,7 +39,8 @@ import org.opendc.compute.api.Image import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState import org.opendc.compute.api.ServerWatcher -import org.opendc.compute.service.driver.HostEvent +import org.opendc.compute.service.driver.Host +import org.opendc.compute.service.driver.HostListener import org.opendc.simulator.compute.SimFairShareHypervisorProvider import org.opendc.simulator.compute.SimMachineModel import org.opendc.simulator.compute.model.MemoryUnit @@ -45,23 +48,20 @@ import org.opendc.simulator.compute.model.ProcessingNode import org.opendc.simulator.compute.model.ProcessingUnit import org.opendc.simulator.compute.workload.SimTraceWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter -import java.time.Clock +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader +import org.opendc.telemetry.sdk.toOtelClock import java.util.UUID +import kotlin.coroutines.resume /** * Basic test-suite for the hypervisor. */ @OptIn(ExperimentalCoroutinesApi::class) internal class SimHostTest { - private lateinit var scope: TestCoroutineScope - private lateinit var clock: Clock private lateinit var machineModel: SimMachineModel @BeforeEach fun setUp() { - scope = TestCoroutineScope() - clock = DelayControllerClockAdapter(scope) - val cpuNode = ProcessingNode("Intel", "Xeon", "amd64", 2) machineModel = SimMachineModel( @@ -74,72 +74,98 @@ internal class SimHostTest { * Test overcommitting of resources by the hypervisor. */ @Test - fun testOvercommitted() { + fun testOvercommitted() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) var requestedWork = 0L var grantedWork = 0L var overcommittedWork = 0L - scope.launch { - val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, SimFairShareHypervisorProvider()) - val duration = 5 * 60L - val vmImageA = MockImage( - UUID.randomUUID(), - "<unnamed>", - emptyMap(), - mapOf( - "workload" to SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 3500.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 183.0, 2) - ), - ) + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + + val virtDriver = SimHost(UUID.randomUUID(), "test", machineModel, emptyMap(), coroutineContext, clock, meterProvider.get("opendc-compute-simulator"), SimFairShareHypervisorProvider()) + val duration = 5 * 60L + val vmImageA = MockImage( + UUID.randomUUID(), + "<unnamed>", + emptyMap(), + mapOf( + "workload" to SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(duration * 1000, 28.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 3500.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 183.0, 2) + ), ) ) - val vmImageB = MockImage( - UUID.randomUUID(), - "<unnamed>", - emptyMap(), - mapOf( - "workload" to SimTraceWorkload( - sequenceOf( - SimTraceWorkload.Fragment(duration * 1000, 28.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 3100.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), - SimTraceWorkload.Fragment(duration * 1000, 73.0, 2) - ) + ) + val vmImageB = MockImage( + UUID.randomUUID(), + "<unnamed>", + emptyMap(), + mapOf( + "workload" to SimTraceWorkload( + sequenceOf( + SimTraceWorkload.Fragment(duration * 1000, 28.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 3100.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 0.0, 2), + SimTraceWorkload.Fragment(duration * 1000, 73.0, 2) ) ) ) + ) - delay(5) - - val flavor = MockFlavor(2, 0) - virtDriver.events - .onEach { event -> - when (event) { - is HostEvent.SliceFinished -> { - requestedWork += event.requestedBurst - grantedWork += event.grantedBurst - overcommittedWork += event.overcommissionedBurst - } - } + val flavor = MockFlavor(2, 0) + + // Setup metric reader + val reader = CoroutineMetricReader( + this, listOf(meterProvider as MetricProducer), + object : MetricExporter { + override fun export(metrics: Collection<MetricData>): CompletableResultCode { + val metricsByName = metrics.associateBy { it.name } + requestedWork += metricsByName.getValue("cpu.work.total").doubleSummaryData.points.first().sum.toLong() + grantedWork += metricsByName.getValue("cpu.work.granted").doubleSummaryData.points.first().sum.toLong() + overcommittedWork += metricsByName.getValue("cpu.work.overcommit").doubleSummaryData.points.first().sum.toLong() + return CompletableResultCode.ofSuccess() } - .launchIn(this) + override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() + + override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() + }, + exportInterval = duration * 1000 + ) + + coroutineScope { launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "a", flavor, vmImageA)) } launch { virtDriver.spawn(MockServer(UUID.randomUUID(), "b", flavor, vmImageB)) } + + suspendCancellableCoroutine<Unit> { cont -> + virtDriver.addListener(object : HostListener { + private var finished = 0 + + override fun onStateChanged(host: Host, server: Server, newState: ServerState) { + if (newState == ServerState.TERMINATED && ++finished == 2) { + cont.resume(Unit) + } + } + }) + } } - scope.advanceUntilIdle() + // Ensure last cycle is collected + delay(1000 * duration) + virtDriver.close() + reader.close() assertAll( - { assertEquals(emptyList<Throwable>(), scope.uncaughtExceptions, "No errors") }, { assertEquals(4197600, requestedWork, "Requested work does not match") }, { assertEquals(2157600, grantedWork, "Granted work does not match") }, { assertEquals(2040000, overcommittedWork, "Overcommitted work does not match") }, - { assertEquals(1200006, scope.currentTime) } + { assertEquals(1500001, currentTime) } ) } |
