From 608ff59b2d7e8ce696fe6f7271d80b5efc9c4b87 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Thu, 25 Mar 2021 21:50:45 +0100 Subject: compute: Integrate OpenTelemetry Metrics in OpenDC Compute This change integrates the OpenTelemetry Metrics API in the OpenDC Compute Service implementation. This replaces the old infrastructure for gathering metrics. --- .../experiments/capelin/CapelinIntegrationTest.kt | 129 ++++++++------------- 1 file changed, 51 insertions(+), 78 deletions(-) (limited to 'simulator/opendc-experiments/opendc-experiments-capelin/src/test') diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index a836b334..fd906f4d 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -22,18 +22,18 @@ package org.opendc.experiments.capelin +import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope -import kotlinx.coroutines.yield -import org.junit.jupiter.api.AfterEach +import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.service.ComputeService import org.opendc.compute.service.driver.Host import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy import org.opendc.experiments.capelin.model.Workload @@ -45,24 +45,14 @@ import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.telemetry.sdk.toOtelClock import java.io.File -import java.time.Clock /** * An integration test suite for the SC20 experiments. */ @OptIn(ExperimentalCoroutinesApi::class) class CapelinIntegrationTest { - /** - * The [TestCoroutineScope] to use. - */ - private lateinit var testScope: TestCoroutineScope - - /** - * The simulation clock to use. - */ - private lateinit var clock: Clock - /** * The monitor used to keep track of the metrics. */ @@ -73,37 +63,28 @@ class CapelinIntegrationTest { */ @BeforeEach fun setUp() { - testScope = TestCoroutineScope() - clock = DelayControllerClockAdapter(testScope) - monitor = TestExperimentReporter() } - /** - * Tear down the experimental environment. - */ - @AfterEach - fun tearDown() = testScope.cleanupTestCoroutines() - @Test - fun testLarge() { + fun testLarge() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) val failures = false val seed = 0 val chan = Channel(Channel.CONFLATED) val allocationPolicy = AvailableCoreMemoryAllocationPolicy() val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - lateinit var scheduler: ComputeService - lateinit var monitorResults: MonitorResults + lateinit var monitorResults: ComputeMetrics - testScope.launch { - scheduler = createComputeService( - this, - clock, - environmentReader, - allocationPolicy - ) + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + val meter: Meter = meterProvider.get("opendc-compute") + + withComputeService(clock, meter, environmentReader, allocationPolicy) { scheduler -> val failureDomain = if (failures) { println("ENABLING failures") createFailureDomain( @@ -118,28 +99,28 @@ class CapelinIntegrationTest { null } - monitorResults = attachMonitor(this, clock, scheduler, monitor) - processTrace( - clock, - traceReader, - scheduler, - chan, - monitor - ) - - println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms} FINISH=${monitorResults.finishedVms}") + withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + processTrace( + clock, + traceReader, + scheduler, + chan, + monitor + ) + } failureDomain?.cancel() - scheduler.close() - monitor.close() } - runSimulation() + monitorResults = collectMetrics(meterProvider as MetricProducer) + println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}") // Note that these values have been verified beforehand assertAll( { assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") }, - { assertEquals(50, monitorResults.finishedVms, "All VMs should finish after a run") }, + { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") }, + { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, + { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, { assertEquals(1672916917970, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, { assertEquals(435179794565, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, { assertEquals(1236692477983, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, @@ -148,38 +129,35 @@ class CapelinIntegrationTest { } @Test - fun testSmall() { + fun testSmall() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) val seed = 1 val chan = Channel(Channel.CONFLATED) val allocationPolicy = AvailableCoreMemoryAllocationPolicy() val traceReader = createTestTraceReader(0.5, seed) val environmentReader = createTestEnvironmentReader("single") - testScope.launch { - val scheduler = createComputeService( - this, - clock, - environmentReader, - allocationPolicy - ) - val monitorResults = attachMonitor(this, clock, scheduler, monitor) - processTrace( - clock, - traceReader, - scheduler, - chan, - monitor - ) - - yield() - - println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms} FINISH=${monitorResults.finishedVms}") - - scheduler.close() - monitor.close() + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + + val meter: Meter = meterProvider.get("opendc-compute") + + withComputeService(clock, meter, environmentReader, allocationPolicy) { scheduler -> + withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + processTrace( + clock, + traceReader, + scheduler, + chan, + monitor + ) + } } - runSimulation() + val metrics = collectMetrics(meterProvider as MetricProducer) + println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}") // Note that these values have been verified beforehand assertAll( @@ -190,11 +168,6 @@ class CapelinIntegrationTest { ) } - /** - * Run the simulation. - */ - private fun runSimulation() = testScope.advanceUntilIdle() - /** * Obtain the trace reader for the test. */ -- cgit v1.2.3