diff options
| author | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-25 21:50:45 +0100 |
|---|---|---|
| committer | Fabian Mastenbroek <mail.fabianm@gmail.com> | 2021-03-26 15:41:05 +0100 |
| commit | 608ff59b2d7e8ce696fe6f7271d80b5efc9c4b87 (patch) | |
| tree | f0130622f189815e41837993b6f66ba3fc11b899 /simulator/opendc-experiments/opendc-experiments-capelin/src | |
| parent | 0d66ef47d6e1ec0861b4939800c5070f96600ca0 (diff) | |
compute: Integrate OpenTelemetry Metrics in OpenDC Compute
This change integrates the OpenTelemetry Metrics API in the OpenDC
Compute Service implementation. This replaces the old infrastructure for
gathering metrics.
Diffstat (limited to 'simulator/opendc-experiments/opendc-experiments-capelin/src')
5 files changed, 201 insertions, 156 deletions
diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 6f99a44e..4f48bba7 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -22,6 +22,11 @@ package org.opendc.experiments.capelin +import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.sdk.common.CompletableResultCode +import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.export.MetricExporter +import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.launchIn @@ -29,7 +34,6 @@ import kotlinx.coroutines.flow.onEach import mu.KotlinLogging import org.opendc.compute.api.* import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostEvent import org.opendc.compute.service.driver.HostListener @@ -45,8 +49,10 @@ import org.opendc.simulator.compute.interference.PerformanceInterferenceModel import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.failures.CorrelatedFaultInjector import org.opendc.simulator.failures.FaultInjector +import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader import java.io.File import java.time.Clock +import kotlin.coroutines.coroutineContext import kotlin.coroutines.resume import kotlin.math.ln import kotlin.math.max @@ -130,12 +136,13 @@ public fun createTraceReader( /** * Construct the environment for a simulated compute service.. */ -public fun createComputeService( - coroutineScope: CoroutineScope, +public suspend fun withComputeService( clock: Clock, + meter: Meter, environmentReader: EnvironmentReader, - allocationPolicy: AllocationPolicy -): ComputeService { + allocationPolicy: AllocationPolicy, + block: suspend CoroutineScope.(ComputeService) -> Unit +): Unit = coroutineScope { val hosts = environmentReader .use { it.read() } .map { def -> @@ -144,7 +151,7 @@ public fun createComputeService( def.name, def.model, def.meta, - coroutineScope.coroutineContext, + coroutineContext, clock, SimFairShareHypervisorProvider(), def.powerModel @@ -152,26 +159,33 @@ public fun createComputeService( } val scheduler = - ComputeService(coroutineScope.coroutineContext, clock, allocationPolicy) + ComputeService(coroutineContext, clock, meter, allocationPolicy) for (host in hosts) { scheduler.addHost(host) } - return scheduler + try { + block(this, scheduler) + } finally { + scheduler.close() + hosts.forEach(SimHost::close) + } } /** * Attach the specified monitor to the VM provisioner. */ @OptIn(ExperimentalCoroutinesApi::class) -public fun attachMonitor( - coroutineScope: CoroutineScope, +public suspend fun withMonitor( + monitor: ExperimentMonitor, clock: Clock, + metricProducer: MetricProducer, scheduler: ComputeService, - monitor: ExperimentMonitor -): MonitorResults { - val results = MonitorResults() + block: suspend CoroutineScope.() -> Unit +): Unit = coroutineScope { + val monitorJobs = mutableSetOf<Job>() + // Monitor host events for (host in scheduler.hosts) { monitor.reportHostStateChange(clock.millis(), host, HostState.UP) @@ -181,7 +195,7 @@ public fun attachMonitor( } }) - host.events + monitorJobs += host.events .onEach { event -> when (event) { is HostEvent.SliceFinished -> monitor.reportHostSlice( @@ -197,37 +211,81 @@ public fun attachMonitor( ) } } - .launchIn(coroutineScope) + .launchIn(this) - (host as SimHost).machine.powerDraw + monitorJobs += (host as SimHost).machine.powerDraw .onEach { monitor.reportPowerConsumption(host, it) } - .launchIn(coroutineScope) + .launchIn(this) } - scheduler.events - .onEach { event -> - when (event) { - is ComputeServiceEvent.MetricsAvailable -> { - results.submittedVms = event.totalVmCount - results.queuedVms = event.waitingVmCount - results.runningVms = event.activeVmCount - results.finishedVms = event.inactiveVmCount - results.unscheduledVms = event.failedVmCount - monitor.reportProvisionerMetrics(clock.millis(), event) - } + val reader = CoroutineMetricReader( + this, listOf(metricProducer), + object : MetricExporter { + override fun export(metrics: Collection<MetricData>): CompletableResultCode { + val metricsByName = metrics.associateBy { it.name } + + val submittedVms = metricsByName["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val queuedVms = metricsByName["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val unscheduledVms = metricsByName["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val runningVms = metricsByName["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val finishedVms = metricsByName["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val hosts = metricsByName["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + val availableHosts = metricsByName["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + + monitor.reportProvisionerMetrics( + clock.millis(), + hosts, + availableHosts, + submittedVms, + runningVms, + finishedVms, + queuedVms, + unscheduledVms + ) + return CompletableResultCode.ofSuccess() } - } - .launchIn(coroutineScope) - return results + override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() + + override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() + }, + exportInterval = 5 * 60 * 1000 + ) + + try { + block(this) + } finally { + monitorJobs.forEach(Job::cancel) + reader.close() + monitor.close() + } } -public class MonitorResults { +public class ComputeMetrics { public var submittedVms: Int = 0 public var queuedVms: Int = 0 public var runningVms: Int = 0 - public var finishedVms: Int = 0 public var unscheduledVms: Int = 0 + public var finishedVms: Int = 0 +} + +/** + * Collect the metrics of the compute service. + */ +public fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics { + val metrics = metricProducer.collectAllMetrics().associateBy { it.name } + val res = ComputeMetrics() + try { + // Hack to extract metrics from OpenTelemetry SDK + res.submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + res.queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + res.unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + res.runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + res.finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + } catch (cause: Throwable) { + logger.warn(cause) { "Failed to collect metrics" } + } + return res } /** @@ -242,12 +300,17 @@ public suspend fun processTrace( ) { val client = scheduler.newClient() val image = client.newImage("vm-image") + var offset = Long.MIN_VALUE try { coroutineScope { while (reader.hasNext()) { val entry = reader.next() - delay(max(0, entry.start - clock.millis())) + if (offset < 0) { + offset = entry.start - clock.millis() + } + + delay(max(0, (entry.start - offset) - clock.millis())) launch { chan.send(Unit) val server = client.newServer( diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index 46e0bcb9..2921daba 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -22,11 +22,13 @@ package org.opendc.experiments.capelin +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.test.runBlockingTest import mu.KotlinLogging import org.opendc.compute.service.scheduler.* import org.opendc.experiments.capelin.model.CompositeWorkload @@ -41,6 +43,7 @@ import org.opendc.format.trace.PerformanceInterferenceModelReader import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.telemetry.sdk.toOtelClock import java.io.File import java.util.concurrent.ConcurrentHashMap import kotlin.random.Random @@ -110,15 +113,21 @@ public abstract class Portfolio(name: String) : Experiment(name) { * Perform a single trial for this portfolio. */ @OptIn(ExperimentalCoroutinesApi::class) - override fun doRun(repeat: Int) { - val testScope = TestCoroutineScope() - val clock = DelayControllerClockAdapter(testScope) + override fun doRun(repeat: Int): Unit = runBlockingTest { + val clock = DelayControllerClockAdapter(this) val seeder = Random(repeat) val environment = Sc20ClusterEnvironmentReader(File(environmentPath, "${topology.name}.txt")) val chan = Channel<Unit>(Channel.CONFLATED) val allocationPolicy = createAllocationPolicy(seeder) + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + + val meter = meterProvider.get("opendc-compute") + val workload = workload val workloadNames = if (workload is CompositeWorkload) { workload.workloads.map { it.name } @@ -144,14 +153,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { 4096 ) - testScope.launch { - val scheduler = createComputeService( - this, - clock, - environment, - allocationPolicy - ) - + withComputeService(clock, meter, environment, allocationPolicy) { scheduler -> val failureDomain = if (operationalPhenomena.failureFrequency > 0) { logger.debug("ENABLING failures") createFailureDomain( @@ -166,30 +168,21 @@ public abstract class Portfolio(name: String) : Experiment(name) { null } - val monitorResults = attachMonitor(this, clock, scheduler, monitor) - processTrace( - clock, - trace, - scheduler, - chan, - monitor - ) - - logger.debug("SUBMIT=${monitorResults.submittedVms}") - logger.debug("FAIL=${monitorResults.unscheduledVms}") - logger.debug("QUEUED=${monitorResults.queuedVms}") - logger.debug("RUNNING=${monitorResults.runningVms}") - logger.debug("FINISHED=${monitorResults.finishedVms}") + withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + processTrace( + clock, + trace, + scheduler, + chan, + monitor + ) + } failureDomain?.cancel() - scheduler.close() } - try { - testScope.advanceUntilIdle() - } finally { - monitor.close() - } + val monitorResults = collectMetrics(meterProvider as MetricProducer) + logger.debug { "Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}" } } /** diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt index 14cc06dc..a57c8d78 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -24,7 +24,6 @@ package org.opendc.experiments.capelin.monitor import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState -import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostState import java.io.Closeable @@ -68,5 +67,14 @@ public interface ExperimentMonitor : Closeable { /** * This method is invoked for a provisioner event. */ - public fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) {} + public fun reportProvisionerMetrics( + time: Long, + totalHostCount: Int, + availableHostCount: Int, + totalVmCount: Int, + activeVmCount: Int, + inactiveVmCount: Int, + waitingVmCount: Int, + failedVmCount: Int + ) {} } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt index c9d57a98..0e675d87 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt @@ -25,7 +25,6 @@ package org.opendc.experiments.capelin.monitor import mu.KotlinLogging import org.opendc.compute.api.Server import org.opendc.compute.api.ServerState -import org.opendc.compute.service.ComputeServiceEvent import org.opendc.compute.service.driver.Host import org.opendc.compute.service.driver.HostState import org.opendc.experiments.capelin.telemetry.HostEvent @@ -172,17 +171,26 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: } } - override fun reportProvisionerMetrics(time: Long, event: ComputeServiceEvent.MetricsAvailable) { + override fun reportProvisionerMetrics( + time: Long, + totalHostCount: Int, + availableHostCount: Int, + totalVmCount: Int, + activeVmCount: Int, + inactiveVmCount: Int, + waitingVmCount: Int, + failedVmCount: Int + ) { provisionerWriter.write( ProvisionerEvent( time, - event.totalHostCount, - event.availableHostCount, - event.totalVmCount, - event.activeVmCount, - event.inactiveVmCount, - event.waitingVmCount, - event.failedVmCount + totalHostCount, + availableHostCount, + totalVmCount, + activeVmCount, + inactiveVmCount, + waitingVmCount, + failedVmCount ) ) } diff --git a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index a836b334..fd906f4d 100644 --- a/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/simulator/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -22,18 +22,18 @@ package org.opendc.experiments.capelin +import io.opentelemetry.api.metrics.Meter +import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.TestCoroutineScope -import kotlinx.coroutines.yield -import org.junit.jupiter.api.AfterEach +import kotlinx.coroutines.test.runBlockingTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.service.ComputeService import org.opendc.compute.service.driver.Host import org.opendc.compute.service.scheduler.AvailableCoreMemoryAllocationPolicy import org.opendc.experiments.capelin.model.Workload @@ -45,8 +45,8 @@ import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.utils.DelayControllerClockAdapter +import org.opendc.telemetry.sdk.toOtelClock import java.io.File -import java.time.Clock /** * An integration test suite for the SC20 experiments. @@ -54,16 +54,6 @@ import java.time.Clock @OptIn(ExperimentalCoroutinesApi::class) class CapelinIntegrationTest { /** - * The [TestCoroutineScope] to use. - */ - private lateinit var testScope: TestCoroutineScope - - /** - * The simulation clock to use. - */ - private lateinit var clock: Clock - - /** * The monitor used to keep track of the metrics. */ private lateinit var monitor: TestExperimentReporter @@ -73,37 +63,28 @@ class CapelinIntegrationTest { */ @BeforeEach fun setUp() { - testScope = TestCoroutineScope() - clock = DelayControllerClockAdapter(testScope) - monitor = TestExperimentReporter() } - /** - * Tear down the experimental environment. - */ - @AfterEach - fun tearDown() = testScope.cleanupTestCoroutines() - @Test - fun testLarge() { + fun testLarge() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) val failures = false val seed = 0 val chan = Channel<Unit>(Channel.CONFLATED) val allocationPolicy = AvailableCoreMemoryAllocationPolicy() val traceReader = createTestTraceReader() val environmentReader = createTestEnvironmentReader() - lateinit var scheduler: ComputeService - lateinit var monitorResults: MonitorResults + lateinit var monitorResults: ComputeMetrics - testScope.launch { - scheduler = createComputeService( - this, - clock, - environmentReader, - allocationPolicy - ) + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + val meter: Meter = meterProvider.get("opendc-compute") + + withComputeService(clock, meter, environmentReader, allocationPolicy) { scheduler -> val failureDomain = if (failures) { println("ENABLING failures") createFailureDomain( @@ -118,28 +99,28 @@ class CapelinIntegrationTest { null } - monitorResults = attachMonitor(this, clock, scheduler, monitor) - processTrace( - clock, - traceReader, - scheduler, - chan, - monitor - ) - - println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms} FINISH=${monitorResults.finishedVms}") + withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + processTrace( + clock, + traceReader, + scheduler, + chan, + monitor + ) + } failureDomain?.cancel() - scheduler.close() - monitor.close() } - runSimulation() + monitorResults = collectMetrics(meterProvider as MetricProducer) + println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms}") // Note that these values have been verified beforehand assertAll( { assertEquals(50, monitorResults.submittedVms, "The trace contains 50 VMs") }, - { assertEquals(50, monitorResults.finishedVms, "All VMs should finish after a run") }, + { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") }, + { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, + { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, { assertEquals(1672916917970, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, { assertEquals(435179794565, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, { assertEquals(1236692477983, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, @@ -148,38 +129,35 @@ class CapelinIntegrationTest { } @Test - fun testSmall() { + fun testSmall() = runBlockingTest { + val clock = DelayControllerClockAdapter(this) val seed = 1 val chan = Channel<Unit>(Channel.CONFLATED) val allocationPolicy = AvailableCoreMemoryAllocationPolicy() val traceReader = createTestTraceReader(0.5, seed) val environmentReader = createTestEnvironmentReader("single") - testScope.launch { - val scheduler = createComputeService( - this, - clock, - environmentReader, - allocationPolicy - ) - val monitorResults = attachMonitor(this, clock, scheduler, monitor) - processTrace( - clock, - traceReader, - scheduler, - chan, - monitor - ) - - yield() - - println("Finish SUBMIT=${monitorResults.submittedVms} FAIL=${monitorResults.unscheduledVms} QUEUE=${monitorResults.queuedVms} RUNNING=${monitorResults.runningVms} FINISH=${monitorResults.finishedVms}") - - scheduler.close() - monitor.close() + val meterProvider: MeterProvider = SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .build() + + val meter: Meter = meterProvider.get("opendc-compute") + + withComputeService(clock, meter, environmentReader, allocationPolicy) { scheduler -> + withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { + processTrace( + clock, + traceReader, + scheduler, + chan, + monitor + ) + } } - runSimulation() + val metrics = collectMetrics(meterProvider as MetricProducer) + println("Finish SUBMIT=${metrics.submittedVms} FAIL=${metrics.unscheduledVms} QUEUE=${metrics.queuedVms} RUNNING=${metrics.runningVms}") // Note that these values have been verified beforehand assertAll( @@ -191,11 +169,6 @@ class CapelinIntegrationTest { } /** - * Run the simulation. - */ - private fun runSimulation() = testScope.advanceUntilIdle() - - /** * Obtain the trace reader for the test. */ private fun createTestTraceReader(fraction: Double = 1.0, seed: Int = 0): TraceReader<SimWorkload> { |
