From f111081627280d4e7e1d7147c56cdce708e32433 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 25 Aug 2021 14:06:39 +0200 Subject: build: Upgrade to OpenTelemetry 1.5 This change upgrades the OpenTelemetry dependency to version 1.5, which contains various breaking changes in the metrics API. --- .../opendc-experiments-capelin/build.gradle.kts | 1 + .../experiments/capelin/ExperimentHelpers.kt | 13 ---- .../capelin/monitor/ExperimentMetricExporter.kt | 77 ++++++++++------------ .../capelin/monitor/ExperimentMonitor.kt | 3 +- .../capelin/monitor/ParquetExperimentMonitor.kt | 8 +-- .../experiments/capelin/CapelinIntegrationTest.kt | 34 +++++----- .../opendc/experiments/tf20/core/SimTFDevice.kt | 16 +++-- 7 files changed, 70 insertions(+), 82 deletions(-) (limited to 'opendc-experiments') diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 324cae3e..53643aba 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -37,6 +37,7 @@ dependencies { implementation(projects.opendcSimulator.opendcSimulatorFailures) implementation(projects.opendcCompute.opendcComputeSimulator) implementation(projects.opendcTelemetry.opendcTelemetrySdk) + implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) implementation(libs.config) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index d7df4454..4cffb8d3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -24,11 +24,7 @@ package org.opendc.experiments.capelin import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory -import io.opentelemetry.sdk.metrics.common.InstrumentType import io.opentelemetry.sdk.metrics.export.MetricProducer -import io.opentelemetry.sdk.metrics.view.InstrumentSelector -import io.opentelemetry.sdk.metrics.view.View import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import mu.KotlinLogging @@ -298,18 +294,9 @@ suspend fun processTrace( * Create a [MeterProvider] instance for the experiment. */ fun createMeterProvider(clock: Clock): MeterProvider { - val powerSelector = InstrumentSelector.builder() - .setInstrumentNameRegex("power\\.usage") - .setInstrumentType(InstrumentType.VALUE_RECORDER) - .build() - val powerView = View.builder() - .setAggregatorFactory(AggregatorFactory.lastValue()) - .build() - return SdkMeterProvider .builder() .setClock(clock.toOtelClock()) - .registerView(powerSelector, powerView) .build() } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt index 7fb2f83c..16358817 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt @@ -22,10 +22,10 @@ package org.opendc.experiments.capelin.monitor -import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.sdk.common.CompletableResultCode import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricExporter +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import org.opendc.compute.service.driver.Host import java.time.Clock @@ -37,7 +37,7 @@ public class ExperimentMetricExporter( private val clock: Clock, private val hosts: Map ) : MetricExporter { - private val hostKey = AttributeKey.stringKey("host") + private val hostKey = ResourceAttributes.HOST_ID override fun export(metrics: Collection): CompletableResultCode { val metricsByName = metrics.associateBy { it.name } @@ -46,50 +46,31 @@ public class ExperimentMetricExporter( return CompletableResultCode.ofSuccess() } + private var lastHostMetrics: Map = emptyMap() + private val hostMetricsSingleton = HostMetrics() + private fun reportHostMetrics(metrics: Map) { val hostMetrics = mutableMapOf() hosts.mapValuesTo(hostMetrics) { HostMetrics() } - mapDoubleSummary(metrics["cpu.demand"], hostMetrics) { m, v -> - m.cpuDemand = v - } - - mapDoubleSummary(metrics["cpu.usage"], hostMetrics) { m, v -> - m.cpuUsage = v - } - - mapDoubleGauge(metrics["power.usage"], hostMetrics) { m, v -> - m.powerDraw = v - } - - mapDoubleSummary(metrics["cpu.work.total"], hostMetrics) { m, v -> - m.requestedBurst = v.toLong() - } - - mapDoubleSummary(metrics["cpu.work.granted"], hostMetrics) { m, v -> - m.grantedBurst = v.toLong() - } - - mapDoubleSummary(metrics["cpu.work.overcommit"], hostMetrics) { m, v -> - m.overcommissionedBurst = v.toLong() - } - - mapDoubleSummary(metrics["cpu.work.interference"], hostMetrics) { m, v -> - m.interferedBurst = v.toLong() - } - - mapLongSum(metrics["guests.active"], hostMetrics) { m, v -> - m.numberOfDeployedImages = v.toInt() - } + mapDoubleSummary(metrics["cpu.demand"], hostMetrics) { m, v -> m.cpuDemand = v } + mapDoubleSummary(metrics["cpu.usage"], hostMetrics) { m, v -> m.cpuUsage = v } + mapDoubleGauge(metrics["power.usage"], hostMetrics) { m, v -> m.powerDraw = v } + mapDoubleSum(metrics["cpu.work.total"], hostMetrics) { m, v -> m.requestedBurst = v } + mapDoubleSum(metrics["cpu.work.granted"], hostMetrics) { m, v -> m.grantedBurst = v } + mapDoubleSum(metrics["cpu.work.overcommit"], hostMetrics) { m, v -> m.overcommissionedBurst = v } + mapDoubleSum(metrics["cpu.work.interference"], hostMetrics) { m, v -> m.interferedBurst = v } + mapLongSum(metrics["guests.active"], hostMetrics) { m, v -> m.numberOfDeployedImages = v.toInt() } for ((id, hostMetric) in hostMetrics) { + val lastHostMetric = lastHostMetrics.getOrDefault(id, hostMetricsSingleton) val host = hosts.getValue(id) monitor.reportHostSlice( clock.millis(), - hostMetric.requestedBurst, - hostMetric.grantedBurst, - hostMetric.overcommissionedBurst, - hostMetric.interferedBurst, + (hostMetric.requestedBurst - lastHostMetric.requestedBurst).toLong(), + (hostMetric.grantedBurst - lastHostMetric.grantedBurst).toLong(), + (hostMetric.overcommissionedBurst - lastHostMetric.overcommissionedBurst).toLong(), + (hostMetric.interferedBurst - lastHostMetric.interferedBurst).toLong(), hostMetric.cpuUsage, hostMetric.cpuDemand, hostMetric.powerDraw, @@ -97,6 +78,8 @@ public class ExperimentMetricExporter( host ) } + + lastHostMetrics = hostMetrics } private fun mapDoubleSummary(data: MetricData?, hostMetrics: MutableMap, block: (HostMetrics, Double) -> Unit) { @@ -137,6 +120,18 @@ public class ExperimentMetricExporter( } } + private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap, block: (HostMetrics, Double) -> Unit) { + val points = data?.doubleSumData?.points ?: emptyList() + for (point in points) { + val uid = point.attributes[hostKey] + val hostMetric = hostMetrics[uid] + + if (hostMetric != null) { + block(hostMetric, point.value) + } + } + } + private fun reportProvisionerMetrics(metrics: Map) { val submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 val queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 @@ -159,10 +154,10 @@ public class ExperimentMetricExporter( } private class HostMetrics { - var requestedBurst: Long = 0 - var grantedBurst: Long = 0 - var overcommissionedBurst: Long = 0 - var interferedBurst: Long = 0 + var requestedBurst: Double = 0.0 + var grantedBurst: Double = 0.0 + var overcommissionedBurst: Double = 0.0 + var interferedBurst: Double = 0.0 var cpuUsage: Double = 0.0 var cpuDemand: Double = 0.0 var numberOfDeployedImages: Int = 0 diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt index 68631dee..79af88fe 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -55,8 +55,7 @@ public interface ExperimentMonitor : AutoCloseable { powerDraw: Double, numberOfDeployedImages: Int, host: Host - ) { - } + ) {} /** * This method is invoked for a provisioner event. diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt index bfdf5f3e..d314c6f5 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt @@ -75,10 +75,10 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: 5 * 60 * 1000L, host, numberOfDeployedImages, - requestedBurst, - grantedBurst, - overcommissionedBurst, - interferedBurst, + requestedBurst.toLong(), + grantedBurst.toLong(), + overcommissionedBurst.toLong(), + interferedBurst.toLong(), cpuUsage, cpuDemand, powerDraw, diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index a3300b71..9b98b329 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -120,9 +120,9 @@ class CapelinIntegrationTest { { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") }, { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, - { assertEquals(219751355711, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, - { assertEquals(206351165081, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, - { assertEquals(1148906334, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, + { assertEquals(220346369672, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, + { assertEquals(206667809431, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, + { assertEquals(1151611104, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } } ) } @@ -160,9 +160,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(37954956986, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(34840774250, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(971076806, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, + { assertEquals(38051879542, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, + { assertEquals(34888186396, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(971668973, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } ) } @@ -204,10 +204,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(37954956986, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(34840774250, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(971076806, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, - { assertEquals(13885404, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } + { assertEquals(38051879542, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, + { assertEquals(34888186396, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(971668973, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, + { assertEquals(13910799, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } ) } @@ -256,9 +256,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(25336984869, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(23668547517, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(368151656, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, + { assertEquals(25412073100, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, + { assertEquals(23695061847, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(368502468, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } ) } @@ -300,10 +300,10 @@ class CapelinIntegrationTest { numberOfDeployedImages: Int, host: Host, ) { - totalRequestedBurst += requestedBurst - totalGrantedBurst += grantedBurst - totalOvercommissionedBurst += overcommissionedBurst - totalInterferedBurst += interferedBurst + totalRequestedBurst += requestedBurst.toLong() + totalGrantedBurst += grantedBurst.toLong() + totalOvercommissionedBurst += overcommissionedBurst.toLong() + totalInterferedBurst += interferedBurst.toLong() } override fun close() {} diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt index d8f92155..0873aac9 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt @@ -22,8 +22,9 @@ package org.opendc.experiments.tf20.core +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.common.Labels import kotlinx.coroutines.* import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.SimMachine @@ -67,23 +68,28 @@ public class SimTFDevice( SimplePowerDriver(powerModel) ) + /** + * The identifier of a device. + */ + private val deviceId = AttributeKey.stringKey("device.id") + /** * The usage of the device. */ - private val _usage = meter.doubleValueRecorderBuilder("device.usage") + private val _usage = meter.histogramBuilder("device.usage") .setDescription("The amount of device resources used") .setUnit("MHz") .build() - .bind(Labels.of("device", uid.toString())) + .bind(Attributes.of(deviceId, uid.toString())) /** * The power draw of the device. */ - private val _power = meter.doubleValueRecorderBuilder("device.power") + private val _power = meter.histogramBuilder("device.power") .setDescription("The power draw of the device") .setUnit("W") .build() - .bind(Labels.of("device", uid.toString())) + .bind(Attributes.of(deviceId, uid.toString())) /** * The workload that will be run by the device. -- cgit v1.2.3 From bb6066e1cecc55a50ac29da200bf3beba1ddd80b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 25 Aug 2021 18:16:20 +0200 Subject: fix(capelin): Eliminate unnecessary double to long conversions This change eliminates the unnecessary conversions from double to long in the Capelin metric processing code. --- .../capelin/monitor/ExperimentMetricExporter.kt | 12 ++--- .../capelin/monitor/ExperimentMonitor.kt | 16 +++--- .../capelin/monitor/ParquetExperimentMonitor.kt | 24 ++++----- .../experiments/capelin/CapelinIntegrationTest.kt | 60 +++++++++++----------- 4 files changed, 56 insertions(+), 56 deletions(-) (limited to 'opendc-experiments') diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt index 16358817..be94593c 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt @@ -65,12 +65,12 @@ public class ExperimentMetricExporter( for ((id, hostMetric) in hostMetrics) { val lastHostMetric = lastHostMetrics.getOrDefault(id, hostMetricsSingleton) val host = hosts.getValue(id) - monitor.reportHostSlice( + monitor.reportHostData( clock.millis(), - (hostMetric.requestedBurst - lastHostMetric.requestedBurst).toLong(), - (hostMetric.grantedBurst - lastHostMetric.grantedBurst).toLong(), - (hostMetric.overcommissionedBurst - lastHostMetric.overcommissionedBurst).toLong(), - (hostMetric.interferedBurst - lastHostMetric.interferedBurst).toLong(), + hostMetric.requestedBurst - lastHostMetric.requestedBurst, + hostMetric.grantedBurst - lastHostMetric.grantedBurst, + hostMetric.overcommissionedBurst - lastHostMetric.overcommissionedBurst, + hostMetric.interferedBurst - lastHostMetric.interferedBurst, hostMetric.cpuUsage, hostMetric.cpuDemand, hostMetric.powerDraw, @@ -141,7 +141,7 @@ public class ExperimentMetricExporter( val hosts = metrics["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 val availableHosts = metrics["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - monitor.reportProvisionerMetrics( + monitor.reportServiceData( clock.millis(), hosts, availableHosts, diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt index 79af88fe..9a4aec35 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -44,23 +44,23 @@ public interface ExperimentMonitor : AutoCloseable { /** * This method is invoked for a host for each slice that is finishes. */ - public fun reportHostSlice( + public fun reportHostData( time: Long, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, + totalWork: Double, + grantedWork: Double, + overcommittedWork: Double, + interferedWork: Double, cpuUsage: Double, cpuDemand: Double, powerDraw: Double, - numberOfDeployedImages: Int, + instanceCount: Int, host: Host ) {} /** - * This method is invoked for a provisioner event. + * This method is invoked for reporting service data. */ - public fun reportProvisionerMetrics( + public fun reportServiceData( time: Long, totalHostCount: Int, availableHostCount: Int, diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt index d314c6f5..83351c41 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt @@ -57,16 +57,16 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: logger.debug { "Host ${host.uid} changed state $newState [$time]" } } - override fun reportHostSlice( + override fun reportHostData( time: Long, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, + totalWork: Double, + grantedWork: Double, + overcommittedWork: Double, + interferedWork: Double, cpuUsage: Double, cpuDemand: Double, powerDraw: Double, - numberOfDeployedImages: Int, + instanceCount: Int, host: Host ) { hostWriter.write( @@ -74,11 +74,11 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: time, 5 * 60 * 1000L, host, - numberOfDeployedImages, - requestedBurst.toLong(), - grantedBurst.toLong(), - overcommissionedBurst.toLong(), - interferedBurst.toLong(), + instanceCount, + totalWork.toLong(), + grantedWork.toLong(), + overcommittedWork.toLong(), + interferedWork.toLong(), cpuUsage, cpuDemand, powerDraw, @@ -87,7 +87,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: ) } - override fun reportProvisionerMetrics( + override fun reportServiceData( time: Long, totalHostCount: Int, availableHostCount: Int, diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 9b98b329..8008c944 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -120,10 +120,10 @@ class CapelinIntegrationTest { { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") }, { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, - { assertEquals(220346369672, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, - { assertEquals(206667809431, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, - { assertEquals(1151611104, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, - { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } } + { assertEquals(220346369753, monitor.totalWork) { "Incorrect requested burst" } }, + { assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } }, + { assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } }, + { assertEquals(0, monitor.totalInterferedWork) { "Incorrect interfered burst" } } ) } @@ -160,10 +160,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(38051879542, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(34888186396, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(971668973, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, - { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } + { assertEquals(38051879552, monitor.totalWork) { "Total requested work incorrect" } }, + { assertEquals(34888186408, monitor.totalGrantedWork) { "Total granted work incorrect" } }, + { assertEquals(971668973, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, + { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } } ) } @@ -204,10 +204,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(38051879542, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(34888186396, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(971668973, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, - { assertEquals(13910799, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } + { assertEquals(38051879552, monitor.totalWork) { "Total requested work incorrect" } }, + { assertEquals(34888186408, monitor.totalGrantedWork) { "Total granted work incorrect" } }, + { assertEquals(971668973, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, + { assertEquals(13910814, monitor.totalInterferedWork) { "Total interfered work incorrect" } } ) } @@ -256,10 +256,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(25412073100, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(23695061847, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(368502468, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, - { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } + { assertEquals(25412073109, monitor.totalWork) { "Total requested work incorrect" } }, + { assertEquals(23695061858, monitor.totalGrantedWork) { "Total granted work incorrect" } }, + { assertEquals(368502468, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, + { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } } ) } @@ -283,27 +283,27 @@ class CapelinIntegrationTest { } class TestExperimentReporter : ExperimentMonitor { - var totalRequestedBurst = 0L - var totalGrantedBurst = 0L - var totalOvercommissionedBurst = 0L - var totalInterferedBurst = 0L + var totalWork = 0L + var totalGrantedWork = 0L + var totalOvercommittedWork = 0L + var totalInterferedWork = 0L - override fun reportHostSlice( + override fun reportHostData( time: Long, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, + totalWork: Double, + grantedWork: Double, + overcommittedWork: Double, + interferedWork: Double, cpuUsage: Double, cpuDemand: Double, powerDraw: Double, - numberOfDeployedImages: Int, + instanceCount: Int, host: Host, ) { - totalRequestedBurst += requestedBurst.toLong() - totalGrantedBurst += grantedBurst.toLong() - totalOvercommissionedBurst += overcommissionedBurst.toLong() - totalInterferedBurst += interferedBurst.toLong() + this.totalWork += totalWork.toLong() + totalGrantedWork += grantedWork.toLong() + totalOvercommittedWork += overcommittedWork.toLong() + totalInterferedWork += interferedWork.toLong() } override fun close() {} -- cgit v1.2.3 From e5b79b18dab4f2874f3c5730b7e599dc74573c8d Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 25 Aug 2021 18:24:55 +0200 Subject: refactor(capelin): Simplify metric extraction for monitor This change updates the metric exporter logic in the Capelin module and reduces the number of allocations by looping directly over the collection of metrics. --- .../experiments/capelin/ExperimentHelpers.kt | 38 ++++++--- .../capelin/monitor/ExperimentMetricExporter.kt | 91 +++++++++++----------- 2 files changed, 74 insertions(+), 55 deletions(-) (limited to 'opendc-experiments') diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 4cffb8d3..7f428b2a 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -24,6 +24,7 @@ package org.opendc.experiments.capelin import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel @@ -208,24 +209,41 @@ class ComputeMetrics { var runningVms: Int = 0 var unscheduledVms: Int = 0 var finishedVms: Int = 0 + var hosts: Int = 0 + var availableHosts = 0 } /** * Collect the metrics of the compute service. */ fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics { - val metrics = metricProducer.collectAllMetrics().associateBy { it.name } + return extractComputeMetrics(metricProducer.collectAllMetrics()) +} + +/** + * Extract an [ComputeMetrics] object from the specified list of metric data. + */ +internal fun extractComputeMetrics(metrics: Collection): ComputeMetrics { val res = ComputeMetrics() - try { - // Hack to extract metrics from OpenTelemetry SDK - res.submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - res.queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - res.unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - res.runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - res.finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - } catch (cause: Throwable) { - logger.warn(cause) { "Failed to collect metrics" } + for (metric in metrics) { + val points = metric.longSumData.points + + if (points.isEmpty()) { + continue + } + + val value = points.first().value.toInt() + when (metric.name) { + "servers.submitted" -> res.submittedVms = value + "servers.waiting" -> res.queuedVms = value + "servers.unscheduled" -> res.unscheduledVms = value + "servers.active" -> res.runningVms = value + "servers.finished" -> res.finishedVms = value + "hosts.total" -> res.hosts = value + "hosts.available" -> res.availableHosts = value + } } + return res } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt index be94593c..e9c817de 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt @@ -27,6 +27,7 @@ import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricExporter import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import org.opendc.compute.service.driver.Host +import org.opendc.experiments.capelin.extractComputeMetrics import java.time.Clock /** @@ -37,44 +38,50 @@ public class ExperimentMetricExporter( private val clock: Clock, private val hosts: Map ) : MetricExporter { - private val hostKey = ResourceAttributes.HOST_ID override fun export(metrics: Collection): CompletableResultCode { - val metricsByName = metrics.associateBy { it.name } - reportHostMetrics(metricsByName) - reportProvisionerMetrics(metricsByName) - return CompletableResultCode.ofSuccess() + return try { + reportHostMetrics(metrics) + reportProvisionerMetrics(metrics) + CompletableResultCode.ofSuccess() + } catch (e: Throwable) { + CompletableResultCode.ofFailure() + } } private var lastHostMetrics: Map = emptyMap() private val hostMetricsSingleton = HostMetrics() - private fun reportHostMetrics(metrics: Map) { + private fun reportHostMetrics(metrics: Collection) { val hostMetrics = mutableMapOf() hosts.mapValuesTo(hostMetrics) { HostMetrics() } - mapDoubleSummary(metrics["cpu.demand"], hostMetrics) { m, v -> m.cpuDemand = v } - mapDoubleSummary(metrics["cpu.usage"], hostMetrics) { m, v -> m.cpuUsage = v } - mapDoubleGauge(metrics["power.usage"], hostMetrics) { m, v -> m.powerDraw = v } - mapDoubleSum(metrics["cpu.work.total"], hostMetrics) { m, v -> m.requestedBurst = v } - mapDoubleSum(metrics["cpu.work.granted"], hostMetrics) { m, v -> m.grantedBurst = v } - mapDoubleSum(metrics["cpu.work.overcommit"], hostMetrics) { m, v -> m.overcommissionedBurst = v } - mapDoubleSum(metrics["cpu.work.interference"], hostMetrics) { m, v -> m.interferedBurst = v } - mapLongSum(metrics["guests.active"], hostMetrics) { m, v -> m.numberOfDeployedImages = v.toInt() } + for (metric in metrics) { + when (metric.name) { + "cpu.demand" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuDemand = v } + "cpu.usage" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuUsage = v } + "power.usage" -> mapDoubleGauge(metric, hostMetrics) { m, v -> m.powerDraw = v } + "cpu.work.total" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.totalWork = v } + "cpu.work.granted" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.grantedWork = v } + "cpu.work.overcommit" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.overcommittedWork = v } + "cpu.work.interference" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.interferedWork = v } + "guests.active" -> mapLongSum(metric, hostMetrics) { m, v -> m.instanceCount = v.toInt() } + } + } for ((id, hostMetric) in hostMetrics) { val lastHostMetric = lastHostMetrics.getOrDefault(id, hostMetricsSingleton) val host = hosts.getValue(id) monitor.reportHostData( clock.millis(), - hostMetric.requestedBurst - lastHostMetric.requestedBurst, - hostMetric.grantedBurst - lastHostMetric.grantedBurst, - hostMetric.overcommissionedBurst - lastHostMetric.overcommissionedBurst, - hostMetric.interferedBurst - lastHostMetric.interferedBurst, + hostMetric.totalWork - lastHostMetric.totalWork, + hostMetric.grantedWork - lastHostMetric.grantedWork, + hostMetric.overcommittedWork - lastHostMetric.overcommittedWork, + hostMetric.interferedWork - lastHostMetric.interferedWork, hostMetric.cpuUsage, hostMetric.cpuDemand, hostMetric.powerDraw, - hostMetric.numberOfDeployedImages, + hostMetric.instanceCount, host ) } @@ -82,10 +89,10 @@ public class ExperimentMetricExporter( lastHostMetrics = hostMetrics } - private fun mapDoubleSummary(data: MetricData?, hostMetrics: MutableMap, block: (HostMetrics, Double) -> Unit) { - val points = data?.doubleSummaryData?.points ?: emptyList() + private fun mapDoubleSummary(data: MetricData, hostMetrics: MutableMap, block: (HostMetrics, Double) -> Unit) { + val points = data.doubleSummaryData?.points ?: emptyList() for (point in points) { - val uid = point.attributes[hostKey] + val uid = point.attributes[ResourceAttributes.HOST_ID] val hostMetric = hostMetrics[uid] if (hostMetric != null) { @@ -99,7 +106,7 @@ public class ExperimentMetricExporter( private fun mapDoubleGauge(data: MetricData?, hostMetrics: MutableMap, block: (HostMetrics, Double) -> Unit) { val points = data?.doubleGaugeData?.points ?: emptyList() for (point in points) { - val uid = point.attributes[hostKey] + val uid = point.attributes[ResourceAttributes.HOST_ID] val hostMetric = hostMetrics[uid] if (hostMetric != null) { @@ -111,7 +118,7 @@ public class ExperimentMetricExporter( private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap, block: (HostMetrics, Long) -> Unit) { val points = data?.longSumData?.points ?: emptyList() for (point in points) { - val uid = point.attributes[hostKey] + val uid = point.attributes[ResourceAttributes.HOST_ID] val hostMetric = hostMetrics[uid] if (hostMetric != null) { @@ -123,7 +130,7 @@ public class ExperimentMetricExporter( private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap, block: (HostMetrics, Double) -> Unit) { val points = data?.doubleSumData?.points ?: emptyList() for (point in points) { - val uid = point.attributes[hostKey] + val uid = point.attributes[ResourceAttributes.HOST_ID] val hostMetric = hostMetrics[uid] if (hostMetric != null) { @@ -132,35 +139,29 @@ public class ExperimentMetricExporter( } } - private fun reportProvisionerMetrics(metrics: Map) { - val submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val hosts = metrics["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val availableHosts = metrics["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + private fun reportProvisionerMetrics(metrics: Collection) { + val res = extractComputeMetrics(metrics) monitor.reportServiceData( clock.millis(), - hosts, - availableHosts, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms + res.hosts, + res.availableHosts, + res.submittedVms, + res.runningVms, + res.finishedVms, + res.queuedVms, + res.unscheduledVms ) } private class HostMetrics { - var requestedBurst: Double = 0.0 - var grantedBurst: Double = 0.0 - var overcommissionedBurst: Double = 0.0 - var interferedBurst: Double = 0.0 + var totalWork: Double = 0.0 + var grantedWork: Double = 0.0 + var overcommittedWork: Double = 0.0 + var interferedWork: Double = 0.0 var cpuUsage: Double = 0.0 var cpuDemand: Double = 0.0 - var numberOfDeployedImages: Int = 0 + var instanceCount: Int = 0 var powerDraw: Double = 0.0 } -- cgit v1.2.3