From 652b8691143f6e65e192de6b940fbfa0a50784e2 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Tue, 4 May 2021 14:11:23 +0200 Subject: exp: Fix aggregation of power draw This change fixes the aggregation of the power draw metric. Previously, if the power draw did not change between collection cycles, the power draw would be reported as zero. This change uses OpenTelemetry Views to collect the latest value of the power draw each cycle. --- .../experiments/capelin/ExperimentHelpers.kt | 26 +++++++++++++++++++++- .../org/opendc/experiments/capelin/Portfolio.kt | 9 +------- .../capelin/monitor/ExperimentMetricExporter.kt | 14 +++++++++++- .../experiments/capelin/CapelinIntegrationTest.kt | 14 ++---------- 4 files changed, 41 insertions(+), 22 deletions(-) (limited to 'opendc-experiments') diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 763234f8..0fbb7280 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -23,7 +23,12 @@ package org.opendc.experiments.capelin import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory +import io.opentelemetry.sdk.metrics.common.InstrumentType import io.opentelemetry.sdk.metrics.export.MetricProducer +import io.opentelemetry.sdk.metrics.view.InstrumentSelector +import io.opentelemetry.sdk.metrics.view.View import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import mu.KotlinLogging @@ -46,9 +51,9 @@ import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.failures.CorrelatedFaultInjector import org.opendc.simulator.failures.FaultInjector import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader +import org.opendc.telemetry.sdk.toOtelClock import java.io.File import java.time.Clock -import kotlin.coroutines.coroutineContext import kotlin.coroutines.resume import kotlin.math.ln import kotlin.math.max @@ -295,3 +300,22 @@ public suspend fun processTrace( client.close() } } + +/** + * Create a [MeterProvider] instance for the experiment. + */ +public fun createMeterProvider(clock: Clock): MeterProvider { + val powerSelector = InstrumentSelector.builder() + .setInstrumentNameRegex("power\\.usage") + .setInstrumentType(InstrumentType.VALUE_RECORDER) + .build() + val powerView = View.builder() + .setAggregatorFactory(AggregatorFactory.lastValue()) + .build() + + return SdkMeterProvider + .builder() + .setClock(clock.toOtelClock()) + .registerView(powerSelector, powerView) + .build() +} diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt index b969366c..fc6f79d3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/Portfolio.kt @@ -22,8 +22,6 @@ package org.opendc.experiments.capelin -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.metrics.SdkMeterProvider import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancel @@ -45,7 +43,6 @@ import org.opendc.format.trace.PerformanceInterferenceModelReader import org.opendc.harness.dsl.Experiment import org.opendc.harness.dsl.anyOf import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.sdk.toOtelClock import java.io.File import java.util.* import java.util.concurrent.ConcurrentHashMap @@ -123,11 +120,7 @@ public abstract class Portfolio(name: String) : Experiment(name) { val chan = Channel(Channel.CONFLATED) val allocationPolicy = createComputeScheduler(seeder) - val meterProvider: MeterProvider = SdkMeterProvider - .builder() - .setClock(clock.toOtelClock()) - .build() - + val meterProvider = createMeterProvider(clock) val workload = workload val workloadNames = if (workload is CompositeWorkload) { workload.workloads.map { it.name } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt index 5f8002e2..54ab3b5b 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt @@ -55,7 +55,7 @@ public class ExperimentMetricExporter( m.cpuUsage = v } - mapDoubleSummary(metrics["power.usage"], hostMetrics) { m, v -> + mapDoubleGauge(metrics["power.usage"], hostMetrics) { m, v -> m.powerDraw = v } @@ -110,6 +110,18 @@ public class ExperimentMetricExporter( } } + private fun mapDoubleGauge(data: MetricData?, hostMetrics: MutableMap, block: (HostMetrics, Double) -> Unit) { + val points = data?.doubleGaugeData?.points ?: emptyList() + for (point in points) { + val uid = point.labels["host"] + val hostMetric = hostMetrics[uid] + + if (hostMetric != null) { + block(hostMetric, point.value) + } + } + } + private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap, block: (HostMetrics, Long) -> Unit) { val points = data?.longSumData?.points ?: emptyList() for (point in points) { diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 4cb50ab9..2d5cc68c 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -22,8 +22,6 @@ package org.opendc.experiments.capelin -import io.opentelemetry.api.metrics.MeterProvider -import io.opentelemetry.sdk.metrics.SdkMeterProvider import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel @@ -45,7 +43,6 @@ import org.opendc.format.environment.sc20.Sc20ClusterEnvironmentReader import org.opendc.format.trace.TraceReader import org.opendc.simulator.compute.workload.SimWorkload import org.opendc.simulator.core.runBlockingSimulation -import org.opendc.telemetry.sdk.toOtelClock import java.io.File /** @@ -78,11 +75,7 @@ class CapelinIntegrationTest { val environmentReader = createTestEnvironmentReader() lateinit var monitorResults: ComputeMetrics - val meterProvider: MeterProvider = SdkMeterProvider - .builder() - .setClock(clock.toOtelClock()) - .build() - + val meterProvider = createMeterProvider(clock) withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> val failureDomain = if (failures) { println("ENABLING failures") @@ -138,10 +131,7 @@ class CapelinIntegrationTest { val traceReader = createTestTraceReader(0.5, seed) val environmentReader = createTestEnvironmentReader("single") - val meterProvider: MeterProvider = SdkMeterProvider - .builder() - .setClock(clock.toOtelClock()) - .build() + val meterProvider = createMeterProvider(clock) withComputeService(clock, meterProvider, environmentReader, allocationPolicy) { scheduler -> withMonitor(monitor, clock, meterProvider as MetricProducer, scheduler) { -- cgit v1.2.3