From f111081627280d4e7e1d7147c56cdce708e32433 Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 25 Aug 2021 14:06:39 +0200 Subject: build: Upgrade to OpenTelemetry 1.5 This change upgrades the OpenTelemetry dependency to version 1.5, which contains various breaking changes in the metrics API. --- gradle/libs.versions.toml | 7 +- .../compute/service/internal/ComputeServiceImpl.kt | 14 ++-- .../opendc-compute-simulator/build.gradle.kts | 1 + .../kotlin/org/opendc/compute/simulator/SimHost.kt | 81 ++++++++++++---------- .../org/opendc/compute/simulator/SimHostTest.kt | 19 +++-- .../opendc-experiments-capelin/build.gradle.kts | 1 + .../experiments/capelin/ExperimentHelpers.kt | 13 ---- .../capelin/monitor/ExperimentMetricExporter.kt | 77 ++++++++++---------- .../capelin/monitor/ExperimentMonitor.kt | 3 +- .../capelin/monitor/ParquetExperimentMonitor.kt | 8 +-- .../experiments/capelin/CapelinIntegrationTest.kt | 34 ++++----- .../opendc/experiments/tf20/core/SimTFDevice.kt | 16 +++-- .../org/opendc/faas/service/FunctionObject.kt | 44 +++++++----- .../faas/service/internal/FaaSServiceImpl.kt | 6 +- .../compute/kernel/SimAbstractHypervisor.kt | 6 ++ .../simulator/compute/kernel/SimHypervisor.kt | 6 ++ .../org/opendc/web/runner/WebExperimentMonitor.kt | 8 +-- .../service/internal/WorkflowServiceImpl.kt | 12 ++-- 18 files changed, 185 insertions(+), 171 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index a15fa45d..4f2ec58c 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -3,8 +3,9 @@ junit-jupiter = "5.7.2" junit-platform = "1.7.2" slf4j = "1.7.32" log4j = "2.14.1" -opentelemetry-main = "1.4.1" -opentelemetry-metrics = "1.4.1-alpha" +opentelemetry-main = "1.5.0" +opentelemetry-metrics = "1.5.0-alpha" +opentelemetry-semconv = "1.5.0-alpha" hadoop = "3.3.0" ktor = "1.6.2" jackson = "2.12.4" @@ -23,6 +24,8 @@ opentelemetry-api-main = { module = "io.opentelemetry:opentelemetry-api", versio opentelemetry-sdk-main = { module = "io.opentelemetry:opentelemetry-sdk", version.ref = "opentelemetry-main" } opentelemetry-api-metrics = { module = "io.opentelemetry:opentelemetry-api-metrics", version.ref = "opentelemetry-metrics" } opentelemetry-sdk-metrics = { module = "io.opentelemetry:opentelemetry-sdk-metrics", version.ref = "opentelemetry-metrics" } +opentelemetry-semconv = { module = "io.opentelemetry:opentelemetry-semconv", version.ref = "opentelemetry-semconv" } + # Testing junit-jupiter-api = { module = "org.junit.jupiter:junit-jupiter-api", version.ref = "junit-jupiter" } diff --git a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt index e7807177..d7a7e8f8 100644 --- a/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt +++ b/opendc-compute/opendc-compute-service/src/main/kotlin/org/opendc/compute/service/internal/ComputeServiceImpl.kt @@ -106,7 +106,7 @@ internal class ComputeServiceImpl( /** * The number of servers that have been submitted to the service for provisioning. */ - private val _submittedServers = meter.longCounterBuilder("servers.submitted") + private val _submittedServers = meter.counterBuilder("servers.submitted") .setDescription("Number of start requests") .setUnit("1") .build() @@ -114,7 +114,7 @@ internal class ComputeServiceImpl( /** * The number of servers that failed to be scheduled. */ - private val _unscheduledServers = meter.longCounterBuilder("servers.unscheduled") + private val _unscheduledServers = meter.counterBuilder("servers.unscheduled") .setDescription("Number of unscheduled servers") .setUnit("1") .build() @@ -122,7 +122,7 @@ internal class ComputeServiceImpl( /** * The number of servers that are waiting to be provisioned. */ - private val _waitingServers = meter.longUpDownCounterBuilder("servers.waiting") + private val _waitingServers = meter.upDownCounterBuilder("servers.waiting") .setDescription("Number of servers waiting to be provisioned") .setUnit("1") .build() @@ -130,7 +130,7 @@ internal class ComputeServiceImpl( /** * The number of servers that are waiting to be provisioned. */ - private val _runningServers = meter.longUpDownCounterBuilder("servers.active") + private val _runningServers = meter.upDownCounterBuilder("servers.active") .setDescription("Number of servers currently running") .setUnit("1") .build() @@ -138,7 +138,7 @@ internal class ComputeServiceImpl( /** * The number of servers that have finished running. */ - private val _finishedServers = meter.longCounterBuilder("servers.finished") + private val _finishedServers = meter.counterBuilder("servers.finished") .setDescription("Number of servers that finished running") .setUnit("1") .build() @@ -146,7 +146,7 @@ internal class ComputeServiceImpl( /** * The number of hosts registered at the compute service. */ - private val _hostCount = meter.longUpDownCounterBuilder("hosts.total") + private val _hostCount = meter.upDownCounterBuilder("hosts.total") .setDescription("Number of hosts") .setUnit("1") .build() @@ -154,7 +154,7 @@ internal class ComputeServiceImpl( /** * The number of available hosts registered at the compute service. */ - private val _availableHostCount = meter.longUpDownCounterBuilder("hosts.available") + private val _availableHostCount = meter.upDownCounterBuilder("hosts.available") .setDescription("Number of available hosts") .setUnit("1") .build() diff --git a/opendc-compute/opendc-compute-simulator/build.gradle.kts b/opendc-compute/opendc-compute-simulator/build.gradle.kts index b31a2114..c5a9e668 100644 --- a/opendc-compute/opendc-compute-simulator/build.gradle.kts +++ b/opendc-compute/opendc-compute-simulator/build.gradle.kts @@ -35,6 +35,7 @@ dependencies { api(projects.opendcSimulator.opendcSimulatorCompute) api(projects.opendcSimulator.opendcSimulatorFailures) implementation(projects.opendcUtils) + implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) testImplementation(projects.opendcSimulator.opendcSimulatorCore) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index be771f6d..546584b6 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -22,8 +22,9 @@ package org.opendc.compute.simulator +import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.common.Labels +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import kotlinx.coroutines.* import mu.KotlinLogging import org.opendc.compute.api.Flavor @@ -107,15 +108,13 @@ public class SimHost( cpuUsage: Double, cpuDemand: Double ) { - - _batch.put(_cpuWork, requestedWork.toDouble()) - _batch.put(_cpuWorkGranted, grantedWork.toDouble()) - _batch.put(_cpuWorkOvercommit, overcommittedWork.toDouble()) - _batch.put(_cpuWorkInterference, interferedWork.toDouble()) - _batch.put(_cpuUsage, cpuUsage) - _batch.put(_cpuDemand, cpuDemand) - _batch.put(_cpuPower, machine.psu.powerDraw) - _batch.record() + _totalWork.add(requestedWork.toDouble()) + _grantedWork.add(grantedWork.toDouble()) + _overcommittedWork.add(overcommittedWork.toDouble()) + _interferedWork.add(interferedWork.toDouble()) + _cpuDemand.record(cpuDemand) + _cpuUsage.record(cpuUsage) + _powerUsage.record(machine.psu.powerDraw) } } ) @@ -135,86 +134,92 @@ public class SimHost( field = value } - override val model: HostModel = HostModel(model.cpus.size, model.memory.map { it.size }.sum()) + override val model: HostModel = HostModel(model.cpus.size, model.memory.sumOf { it.size }) /** - * The number of guests on the host. + * The total number of guests. */ - private val _guests = meter.longUpDownCounterBuilder("guests.total") + private val _guests = meter.upDownCounterBuilder("guests.total") .setDescription("Number of guests") .setUnit("1") .build() - .bind(Labels.of("host", uid.toString())) + .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** * The number of active guests on the host. */ - private val _activeGuests = meter.longUpDownCounterBuilder("guests.active") + private val _activeGuests = meter.upDownCounterBuilder("guests.active") .setDescription("Number of active guests") .setUnit("1") .build() - .bind(Labels.of("host", uid.toString())) + .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** - * The CPU usage on the host. + * The CPU demand of the host. */ - private val _cpuUsage = meter.doubleValueRecorderBuilder("cpu.usage") - .setDescription("The amount of CPU resources used by the host") + private val _cpuDemand = meter.histogramBuilder("cpu.demand") + .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits") .setUnit("MHz") .build() + .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** - * The CPU demand on the host. + * The CPU usage of the host. */ - private val _cpuDemand = meter.doubleValueRecorderBuilder("cpu.demand") - .setDescription("The amount of CPU resources the guests would use if there were no CPU contention or CPU limits") + private val _cpuUsage = meter.histogramBuilder("cpu.usage") + .setDescription("The amount of CPU resources used by the host") .setUnit("MHz") .build() + .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** - * The requested work for the CPU. + * The power usage of the host. */ - private val _cpuPower = meter.doubleValueRecorderBuilder("power.usage") + private val _powerUsage = meter.histogramBuilder("power.usage") .setDescription("The amount of power used by the CPU") .setUnit("W") .build() + .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** - * The requested work for the CPU. + * The total amount of work supplied to the CPU. */ - private val _cpuWork = meter.doubleValueRecorderBuilder("cpu.work.total") + private val _totalWork = meter.counterBuilder("cpu.work.total") .setDescription("The amount of work supplied to the CPU") .setUnit("1") + .ofDoubles() .build() + .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** - * The work actually performed by the CPU. + * The work performed by the CPU. */ - private val _cpuWorkGranted = meter.doubleValueRecorderBuilder("cpu.work.granted") + private val _grantedWork = meter.counterBuilder("cpu.work.granted") .setDescription("The amount of work performed by the CPU") .setUnit("1") + .ofDoubles() .build() + .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** - * The work that could not be performed by the CPU due to overcommitting resource. + * The amount not performed by the CPU due to overcommitment. */ - private val _cpuWorkOvercommit = meter.doubleValueRecorderBuilder("cpu.work.overcommit") + private val _overcommittedWork = meter.counterBuilder("cpu.work.overcommit") .setDescription("The amount of work not performed by the CPU due to overcommitment") .setUnit("1") + .ofDoubles() .build() + .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) /** - * The work that could not be performed by the CPU due to interference. + * The amount of work not performed by the CPU due to interference. */ - private val _cpuWorkInterference = meter.doubleValueRecorderBuilder("cpu.work.interference") + private val _interferedWork = meter.counterBuilder("cpu.work.interference") .setDescription("The amount of work not performed by the CPU due to interference") .setUnit("1") + .ofDoubles() .build() - - /** - * The batch recorder used to record multiple metrics atomically. - */ - private val _batch = meter.newBatchRecorder("host", uid.toString()) + .bind(Attributes.of(ResourceAttributes.HOST_ID, uid.toString())) init { // Launch hypervisor onto machine @@ -273,8 +278,8 @@ public class SimHost( override suspend fun delete(server: Server) { val guest = guests.remove(server) ?: return - guest.terminate() _guests.add(-1) + guest.terminate() } override fun addListener(listener: HostListener) { diff --git a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt index 93a2248a..1ba3a9a1 100644 --- a/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt +++ b/opendc-compute/opendc-compute-simulator/src/test/kotlin/org/opendc/compute/simulator/SimHostTest.kt @@ -133,17 +133,14 @@ internal class SimHostTest { object : MetricExporter { override fun export(metrics: Collection): CompletableResultCode { val metricsByName = metrics.associateBy { it.name } - val totalWork = metricsByName["cpu.work.total"] - if (totalWork != null) { - requestedWork += totalWork.doubleSummaryData.points.first().sum.toLong() + metricsByName["cpu.work.total"]?.let { + requestedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong() } - val grantedWorkCycle = metricsByName["cpu.work.granted"] - if (grantedWorkCycle != null) { - grantedWork += grantedWorkCycle.doubleSummaryData.points.first().sum.toLong() + metricsByName["cpu.work.granted"]?.let { + grantedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong() } - val overcommittedWorkCycle = metricsByName["cpu.work.overcommit"] - if (overcommittedWorkCycle != null) { - overcommittedWork += overcommittedWorkCycle.doubleSummaryData.points.first().sum.toLong() + metricsByName["cpu.work.overcommit"]?.let { + overcommittedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong() } return CompletableResultCode.ofSuccess() } @@ -236,10 +233,10 @@ internal class SimHostTest { override fun export(metrics: Collection): CompletableResultCode { val metricsByName = metrics.associateBy { it.name } metricsByName["cpu.work.total"]?.let { - requestedWork += it.doubleSummaryData.points.first().sum.toLong() + requestedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong() } metricsByName["cpu.work.granted"]?.let { - grantedWork += it.doubleSummaryData.points.first().sum.toLong() + grantedWork = it.doubleSumData.points.sumOf { point -> point.value }.toLong() } return CompletableResultCode.ofSuccess() } diff --git a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts index 324cae3e..53643aba 100644 --- a/opendc-experiments/opendc-experiments-capelin/build.gradle.kts +++ b/opendc-experiments/opendc-experiments-capelin/build.gradle.kts @@ -37,6 +37,7 @@ dependencies { implementation(projects.opendcSimulator.opendcSimulatorFailures) implementation(projects.opendcCompute.opendcComputeSimulator) implementation(projects.opendcTelemetry.opendcTelemetrySdk) + implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) implementation(libs.config) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index d7df4454..4cffb8d3 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -24,11 +24,7 @@ package org.opendc.experiments.capelin import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.SdkMeterProvider -import io.opentelemetry.sdk.metrics.aggregator.AggregatorFactory -import io.opentelemetry.sdk.metrics.common.InstrumentType import io.opentelemetry.sdk.metrics.export.MetricProducer -import io.opentelemetry.sdk.metrics.view.InstrumentSelector -import io.opentelemetry.sdk.metrics.view.View import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import mu.KotlinLogging @@ -298,18 +294,9 @@ suspend fun processTrace( * Create a [MeterProvider] instance for the experiment. */ fun createMeterProvider(clock: Clock): MeterProvider { - val powerSelector = InstrumentSelector.builder() - .setInstrumentNameRegex("power\\.usage") - .setInstrumentType(InstrumentType.VALUE_RECORDER) - .build() - val powerView = View.builder() - .setAggregatorFactory(AggregatorFactory.lastValue()) - .build() - return SdkMeterProvider .builder() .setClock(clock.toOtelClock()) - .registerView(powerSelector, powerView) .build() } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt index 7fb2f83c..16358817 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt @@ -22,10 +22,10 @@ package org.opendc.experiments.capelin.monitor -import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.sdk.common.CompletableResultCode import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricExporter +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import org.opendc.compute.service.driver.Host import java.time.Clock @@ -37,7 +37,7 @@ public class ExperimentMetricExporter( private val clock: Clock, private val hosts: Map ) : MetricExporter { - private val hostKey = AttributeKey.stringKey("host") + private val hostKey = ResourceAttributes.HOST_ID override fun export(metrics: Collection): CompletableResultCode { val metricsByName = metrics.associateBy { it.name } @@ -46,50 +46,31 @@ public class ExperimentMetricExporter( return CompletableResultCode.ofSuccess() } + private var lastHostMetrics: Map = emptyMap() + private val hostMetricsSingleton = HostMetrics() + private fun reportHostMetrics(metrics: Map) { val hostMetrics = mutableMapOf() hosts.mapValuesTo(hostMetrics) { HostMetrics() } - mapDoubleSummary(metrics["cpu.demand"], hostMetrics) { m, v -> - m.cpuDemand = v - } - - mapDoubleSummary(metrics["cpu.usage"], hostMetrics) { m, v -> - m.cpuUsage = v - } - - mapDoubleGauge(metrics["power.usage"], hostMetrics) { m, v -> - m.powerDraw = v - } - - mapDoubleSummary(metrics["cpu.work.total"], hostMetrics) { m, v -> - m.requestedBurst = v.toLong() - } - - mapDoubleSummary(metrics["cpu.work.granted"], hostMetrics) { m, v -> - m.grantedBurst = v.toLong() - } - - mapDoubleSummary(metrics["cpu.work.overcommit"], hostMetrics) { m, v -> - m.overcommissionedBurst = v.toLong() - } - - mapDoubleSummary(metrics["cpu.work.interference"], hostMetrics) { m, v -> - m.interferedBurst = v.toLong() - } - - mapLongSum(metrics["guests.active"], hostMetrics) { m, v -> - m.numberOfDeployedImages = v.toInt() - } + mapDoubleSummary(metrics["cpu.demand"], hostMetrics) { m, v -> m.cpuDemand = v } + mapDoubleSummary(metrics["cpu.usage"], hostMetrics) { m, v -> m.cpuUsage = v } + mapDoubleGauge(metrics["power.usage"], hostMetrics) { m, v -> m.powerDraw = v } + mapDoubleSum(metrics["cpu.work.total"], hostMetrics) { m, v -> m.requestedBurst = v } + mapDoubleSum(metrics["cpu.work.granted"], hostMetrics) { m, v -> m.grantedBurst = v } + mapDoubleSum(metrics["cpu.work.overcommit"], hostMetrics) { m, v -> m.overcommissionedBurst = v } + mapDoubleSum(metrics["cpu.work.interference"], hostMetrics) { m, v -> m.interferedBurst = v } + mapLongSum(metrics["guests.active"], hostMetrics) { m, v -> m.numberOfDeployedImages = v.toInt() } for ((id, hostMetric) in hostMetrics) { + val lastHostMetric = lastHostMetrics.getOrDefault(id, hostMetricsSingleton) val host = hosts.getValue(id) monitor.reportHostSlice( clock.millis(), - hostMetric.requestedBurst, - hostMetric.grantedBurst, - hostMetric.overcommissionedBurst, - hostMetric.interferedBurst, + (hostMetric.requestedBurst - lastHostMetric.requestedBurst).toLong(), + (hostMetric.grantedBurst - lastHostMetric.grantedBurst).toLong(), + (hostMetric.overcommissionedBurst - lastHostMetric.overcommissionedBurst).toLong(), + (hostMetric.interferedBurst - lastHostMetric.interferedBurst).toLong(), hostMetric.cpuUsage, hostMetric.cpuDemand, hostMetric.powerDraw, @@ -97,6 +78,8 @@ public class ExperimentMetricExporter( host ) } + + lastHostMetrics = hostMetrics } private fun mapDoubleSummary(data: MetricData?, hostMetrics: MutableMap, block: (HostMetrics, Double) -> Unit) { @@ -137,6 +120,18 @@ public class ExperimentMetricExporter( } } + private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap, block: (HostMetrics, Double) -> Unit) { + val points = data?.doubleSumData?.points ?: emptyList() + for (point in points) { + val uid = point.attributes[hostKey] + val hostMetric = hostMetrics[uid] + + if (hostMetric != null) { + block(hostMetric, point.value) + } + } + } + private fun reportProvisionerMetrics(metrics: Map) { val submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 val queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 @@ -159,10 +154,10 @@ public class ExperimentMetricExporter( } private class HostMetrics { - var requestedBurst: Long = 0 - var grantedBurst: Long = 0 - var overcommissionedBurst: Long = 0 - var interferedBurst: Long = 0 + var requestedBurst: Double = 0.0 + var grantedBurst: Double = 0.0 + var overcommissionedBurst: Double = 0.0 + var interferedBurst: Double = 0.0 var cpuUsage: Double = 0.0 var cpuDemand: Double = 0.0 var numberOfDeployedImages: Int = 0 diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt index 68631dee..79af88fe 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -55,8 +55,7 @@ public interface ExperimentMonitor : AutoCloseable { powerDraw: Double, numberOfDeployedImages: Int, host: Host - ) { - } + ) {} /** * This method is invoked for a provisioner event. diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt index bfdf5f3e..d314c6f5 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt @@ -75,10 +75,10 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: 5 * 60 * 1000L, host, numberOfDeployedImages, - requestedBurst, - grantedBurst, - overcommissionedBurst, - interferedBurst, + requestedBurst.toLong(), + grantedBurst.toLong(), + overcommissionedBurst.toLong(), + interferedBurst.toLong(), cpuUsage, cpuDemand, powerDraw, diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index a3300b71..9b98b329 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -120,9 +120,9 @@ class CapelinIntegrationTest { { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") }, { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, - { assertEquals(219751355711, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, - { assertEquals(206351165081, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, - { assertEquals(1148906334, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, + { assertEquals(220346369672, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, + { assertEquals(206667809431, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, + { assertEquals(1151611104, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } } ) } @@ -160,9 +160,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(37954956986, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(34840774250, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(971076806, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, + { assertEquals(38051879542, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, + { assertEquals(34888186396, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(971668973, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } ) } @@ -204,10 +204,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(37954956986, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(34840774250, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(971076806, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, - { assertEquals(13885404, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } + { assertEquals(38051879542, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, + { assertEquals(34888186396, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(971668973, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, + { assertEquals(13910799, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } ) } @@ -256,9 +256,9 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(25336984869, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(23668547517, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(368151656, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, + { assertEquals(25412073100, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, + { assertEquals(23695061847, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, + { assertEquals(368502468, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } ) } @@ -300,10 +300,10 @@ class CapelinIntegrationTest { numberOfDeployedImages: Int, host: Host, ) { - totalRequestedBurst += requestedBurst - totalGrantedBurst += grantedBurst - totalOvercommissionedBurst += overcommissionedBurst - totalInterferedBurst += interferedBurst + totalRequestedBurst += requestedBurst.toLong() + totalGrantedBurst += grantedBurst.toLong() + totalOvercommissionedBurst += overcommissionedBurst.toLong() + totalInterferedBurst += interferedBurst.toLong() } override fun close() {} diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt index d8f92155..0873aac9 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt @@ -22,8 +22,9 @@ package org.opendc.experiments.tf20.core +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.common.Labels import kotlinx.coroutines.* import org.opendc.simulator.compute.SimBareMetalMachine import org.opendc.simulator.compute.SimMachine @@ -67,23 +68,28 @@ public class SimTFDevice( SimplePowerDriver(powerModel) ) + /** + * The identifier of a device. + */ + private val deviceId = AttributeKey.stringKey("device.id") + /** * The usage of the device. */ - private val _usage = meter.doubleValueRecorderBuilder("device.usage") + private val _usage = meter.histogramBuilder("device.usage") .setDescription("The amount of device resources used") .setUnit("MHz") .build() - .bind(Labels.of("device", uid.toString())) + .bind(Attributes.of(deviceId, uid.toString())) /** * The power draw of the device. */ - private val _power = meter.doubleValueRecorderBuilder("device.power") + private val _power = meter.histogramBuilder("device.power") .setDescription("The power draw of the device") .setUnit("W") .build() - .bind(Labels.of("device", uid.toString())) + .bind(Attributes.of(deviceId, uid.toString())) /** * The workload that will be run by the device. diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt index 7c7621b8..a1cb1dbf 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/FunctionObject.kt @@ -22,11 +22,12 @@ package org.opendc.faas.service +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes import io.opentelemetry.api.metrics.BoundLongCounter +import io.opentelemetry.api.metrics.BoundLongHistogram import io.opentelemetry.api.metrics.BoundLongUpDownCounter -import io.opentelemetry.api.metrics.BoundLongValueRecorder import io.opentelemetry.api.metrics.Meter -import io.opentelemetry.api.metrics.common.Labels import org.opendc.faas.service.deployer.FunctionInstance import java.util.* @@ -41,77 +42,84 @@ public class FunctionObject( labels: Map, meta: Map ) : AutoCloseable { + /** + * The function identifier attached to the metrics. + */ + private val functionId = AttributeKey.stringKey("function") + /** * The total amount of function invocations received by the function. */ - public val invocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.total") + public val invocations: BoundLongCounter = meter.counterBuilder("function.invocations.total") .setDescription("Number of function invocations") .setUnit("1") .build() - .bind(Labels.of("function", uid.toString())) + .bind(Attributes.of(functionId, uid.toString())) /** * The amount of function invocations that could be handled directly. */ - public val timelyInvocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.warm") + public val timelyInvocations: BoundLongCounter = meter.counterBuilder("function.invocations.warm") .setDescription("Number of function invocations handled directly") .setUnit("1") .build() - .bind(Labels.of("function", uid.toString())) + .bind(Attributes.of(functionId, uid.toString())) /** * The amount of function invocations that were delayed due to function deployment. */ - public val delayedInvocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.cold") + public val delayedInvocations: BoundLongCounter = meter.counterBuilder("function.invocations.cold") .setDescription("Number of function invocations that are delayed") .setUnit("1") .build() - .bind(Labels.of("function", uid.toString())) + .bind(Attributes.of(functionId, uid.toString())) /** * The amount of function invocations that failed. */ - public val failedInvocations: BoundLongCounter = meter.longCounterBuilder("function.invocations.failed") + public val failedInvocations: BoundLongCounter = meter.counterBuilder("function.invocations.failed") .setDescription("Number of function invocations that failed") .setUnit("1") .build() - .bind(Labels.of("function", uid.toString())) + .bind(Attributes.of(functionId, uid.toString())) /** * The amount of instances for this function. */ - public val activeInstances: BoundLongUpDownCounter = meter.longUpDownCounterBuilder("function.instances.active") + public val activeInstances: BoundLongUpDownCounter = meter.upDownCounterBuilder("function.instances.active") .setDescription("Number of active function instances") .setUnit("1") .build() - .bind(Labels.of("function", uid.toString())) + .bind(Attributes.of(functionId, uid.toString())) /** * The amount of idle instances for this function. */ - public val idleInstances: BoundLongUpDownCounter = meter.longUpDownCounterBuilder("function.instances.idle") + public val idleInstances: BoundLongUpDownCounter = meter.upDownCounterBuilder("function.instances.idle") .setDescription("Number of idle function instances") .setUnit("1") .build() - .bind(Labels.of("function", uid.toString())) + .bind(Attributes.of(functionId, uid.toString())) /** * The time that the function waited. */ - public val waitTime: BoundLongValueRecorder = meter.longValueRecorderBuilder("function.time.wait") + public val waitTime: BoundLongHistogram = meter.histogramBuilder("function.time.wait") + .ofLongs() .setDescription("Time the function has to wait before being started") .setUnit("ms") .build() - .bind(Labels.of("function", uid.toString())) + .bind(Attributes.of(functionId, uid.toString())) /** * The time that the function was running. */ - public val activeTime: BoundLongValueRecorder = meter.longValueRecorderBuilder("function.time.active") + public val activeTime: BoundLongHistogram = meter.histogramBuilder("function.time.active") + .ofLongs() .setDescription("Time the function was running") .setUnit("ms") .build() - .bind(Labels.of("function", uid.toString())) + .bind(Attributes.of(functionId, uid.toString())) /** * The instances associated with this function. diff --git a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt index b169436f..ccf9a5d9 100644 --- a/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt +++ b/opendc-faas/opendc-faas-service/src/main/kotlin/org/opendc/faas/service/internal/FaaSServiceImpl.kt @@ -93,7 +93,7 @@ internal class FaaSServiceImpl( /** * The total amount of function invocations received by the service. */ - private val _invocations = meter.longCounterBuilder("service.invocations.total") + private val _invocations = meter.counterBuilder("service.invocations.total") .setDescription("Number of function invocations") .setUnit("1") .build() @@ -101,7 +101,7 @@ internal class FaaSServiceImpl( /** * The amount of function invocations that could be handled directly. */ - private val _timelyInvocations = meter.longCounterBuilder("service.invocations.warm") + private val _timelyInvocations = meter.counterBuilder("service.invocations.warm") .setDescription("Number of function invocations handled directly") .setUnit("1") .build() @@ -109,7 +109,7 @@ internal class FaaSServiceImpl( /** * The amount of function invocations that were delayed due to function deployment. */ - private val _delayedInvocations = meter.longCounterBuilder("service.invocations.cold") + private val _delayedInvocations = meter.counterBuilder("service.invocations.cold") .setDescription("Number of function invocations that are delayed") .setUnit("1") .build() diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt index d287312f..6002270a 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimAbstractHypervisor.kt @@ -59,6 +59,12 @@ public abstract class SimAbstractHypervisor( override val vms: Set get() = _vms + /** + * The resource counters associated with the hypervisor. + */ + public override val counters: SimResourceCounters + get() = switch.counters + /** * The scaling governors attached to the physical CPUs backing this hypervisor. */ diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt index e398ab36..d3996914 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt @@ -25,6 +25,7 @@ package org.opendc.simulator.compute.kernel import org.opendc.simulator.compute.SimMachine import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.workload.SimWorkload +import org.opendc.simulator.resources.SimResourceCounters /** * A SimHypervisor facilitates the execution of multiple concurrent [SimWorkload]s, while acting as a single workload @@ -36,6 +37,11 @@ public interface SimHypervisor : SimWorkload { */ public val vms: Set + /** + * The resource counters associated with the hypervisor. + */ + public val counters: SimResourceCounters + /** * Determine whether the specified machine characterized by [model] can fit on this hypervisor at this moment. */ diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt index d4445810..140f067a 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt @@ -45,10 +45,10 @@ public class WebExperimentMonitor : ExperimentMonitor { override fun reportHostSlice( time: Long, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, + requestedBurst: Double, + grantedBurst: Double, + overcommissionedBurst: Double, + interferedBurst: Double, cpuUsage: Double, cpuDemand: Double, powerDraw: Double, diff --git a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt index 32191b8f..5329143d 100644 --- a/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt +++ b/opendc-workflow/opendc-workflow-service/src/main/kotlin/org/opendc/workflow/service/internal/WorkflowServiceImpl.kt @@ -155,7 +155,7 @@ public class WorkflowServiceImpl( /** * The number of jobs that have been submitted to the service. */ - private val submittedJobs = meter.longCounterBuilder("jobs.submitted") + private val submittedJobs = meter.counterBuilder("jobs.submitted") .setDescription("Number of submitted jobs") .setUnit("1") .build() @@ -163,7 +163,7 @@ public class WorkflowServiceImpl( /** * The number of jobs that are running. */ - private val runningJobs = meter.longUpDownCounterBuilder("jobs.active") + private val runningJobs = meter.upDownCounterBuilder("jobs.active") .setDescription("Number of jobs running") .setUnit("1") .build() @@ -171,7 +171,7 @@ public class WorkflowServiceImpl( /** * The number of jobs that have finished running. */ - private val finishedJobs = meter.longCounterBuilder("jobs.finished") + private val finishedJobs = meter.counterBuilder("jobs.finished") .setDescription("Number of jobs that finished running") .setUnit("1") .build() @@ -179,7 +179,7 @@ public class WorkflowServiceImpl( /** * The number of tasks that have been submitted to the service. */ - private val submittedTasks = meter.longCounterBuilder("tasks.submitted") + private val submittedTasks = meter.counterBuilder("tasks.submitted") .setDescription("Number of submitted tasks") .setUnit("1") .build() @@ -187,7 +187,7 @@ public class WorkflowServiceImpl( /** * The number of jobs that are running. */ - private val runningTasks = meter.longUpDownCounterBuilder("tasks.active") + private val runningTasks = meter.upDownCounterBuilder("tasks.active") .setDescription("Number of tasks running") .setUnit("1") .build() @@ -195,7 +195,7 @@ public class WorkflowServiceImpl( /** * The number of jobs that have finished running. */ - private val finishedTasks = meter.longCounterBuilder("tasks.finished") + private val finishedTasks = meter.counterBuilder("tasks.finished") .setDescription("Number of tasks that finished running") .setUnit("1") .build() -- cgit v1.2.3 From e6dd553cf77445083f2c7632bd3b4c3611d76d0a Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 25 Aug 2021 18:15:07 +0200 Subject: fix(simulator): Eliminate unnecessary double to long conversions This change eliminates unnecessary double to long conversions in the simulator. Previously, we used longs to denote the amount of work. However, in the mean time we have switched to doubles in the lower stack. --- .../kotlin/org/opendc/compute/simulator/SimHost.kt | 16 ++++----- .../compute/kernel/SimFairShareHypervisor.kt | 8 ++--- .../simulator/compute/kernel/SimHypervisor.kt | 8 ++--- .../simulator/compute/kernel/SimHypervisorTest.kt | 40 +++++++++++----------- 4 files changed, 36 insertions(+), 36 deletions(-) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index 546584b6..dcc525cb 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -101,17 +101,17 @@ public class SimHost( listener = object : SimHypervisor.Listener { override fun onSliceFinish( hypervisor: SimHypervisor, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, + requestedWork: Double, + grantedWork: Double, + overcommittedWork: Double, + interferedWork: Double, cpuUsage: Double, cpuDemand: Double ) { - _totalWork.add(requestedWork.toDouble()) - _grantedWork.add(grantedWork.toDouble()) - _overcommittedWork.add(overcommittedWork.toDouble()) - _interferedWork.add(interferedWork.toDouble()) + _totalWork.add(requestedWork) + _grantedWork.add(grantedWork) + _overcommittedWork.add(overcommittedWork) + _interferedWork.add(interferedWork) _cpuDemand.record(cpuDemand) _cpuUsage.record(cpuUsage) _powerUsage.record(machine.psu.powerDraw) diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt index c31b1f6b..3b44292d 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimFairShareHypervisor.kt @@ -77,10 +77,10 @@ public class SimFairShareHypervisor( if (timestamp > lastReport) { listener.onSliceFinish( this@SimFairShareHypervisor, - (counters.demand - lastDemand).toLong(), - (counters.actual - lastActual).toLong(), - (counters.overcommit - lastOvercommit).toLong(), - (counters.interference - lastInterference).toLong(), + counters.demand - lastDemand, + counters.actual - lastActual, + counters.overcommit - lastOvercommit, + counters.interference - lastInterference, lastCpuUsage, lastCpuDemand ) diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt index d3996914..af28c346 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/kernel/SimHypervisor.kt @@ -64,10 +64,10 @@ public interface SimHypervisor : SimWorkload { */ public fun onSliceFinish( hypervisor: SimHypervisor, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, + requestedWork: Double, + grantedWork: Double, + overcommittedWork: Double, + interferedWork: Double, cpuUsage: Double, cpuDemand: Double ) diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt index afc4c949..918271d1 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/kernel/SimHypervisorTest.kt @@ -68,16 +68,16 @@ internal class SimHypervisorTest { @Test fun testOvercommittedSingle() = runBlockingSimulation { val listener = object : SimHypervisor.Listener { - var totalRequestedWork = 0L - var totalGrantedWork = 0L - var totalOvercommittedWork = 0L + var totalRequestedWork = 0.0 + var totalGrantedWork = 0.0 + var totalOvercommittedWork = 0.0 override fun onSliceFinish( hypervisor: SimHypervisor, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, + requestedWork: Double, + grantedWork: Double, + overcommittedWork: Double, + interferedWork: Double, cpuUsage: Double, cpuDemand: Double ) { @@ -117,9 +117,9 @@ internal class SimHypervisorTest { machine.close() assertAll( - { assertEquals(1113300, listener.totalRequestedWork, "Requested Burst does not match") }, - { assertEquals(1023300, listener.totalGrantedWork, "Granted Burst does not match") }, - { assertEquals(90000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, + { assertEquals(1113300.0, listener.totalRequestedWork, "Requested Burst does not match") }, + { assertEquals(1023300.0, listener.totalGrantedWork, "Granted Burst does not match") }, + { assertEquals(90000.0, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, { assertEquals(listOf(0.0, 0.00875, 1.0, 0.0, 0.0571875, 0.0), res) { "VM usage is correct" } }, { assertEquals(1200000, clock.millis()) { "Current time is correct" } } ) @@ -131,16 +131,16 @@ internal class SimHypervisorTest { @Test fun testOvercommittedDual() = runBlockingSimulation { val listener = object : SimHypervisor.Listener { - var totalRequestedWork = 0L - var totalGrantedWork = 0L - var totalOvercommittedWork = 0L + var totalRequestedWork = 0.0 + var totalGrantedWork = 0.0 + var totalOvercommittedWork = 0.0 override fun onSliceFinish( hypervisor: SimHypervisor, - requestedWork: Long, - grantedWork: Long, - overcommittedWork: Long, - interferedWork: Long, + requestedWork: Double, + grantedWork: Double, + overcommittedWork: Double, + interferedWork: Double, cpuUsage: Double, cpuDemand: Double ) { @@ -196,9 +196,9 @@ internal class SimHypervisorTest { yield() assertAll( - { assertEquals(2073600, listener.totalRequestedWork, "Requested Burst does not match") }, - { assertEquals(1053600, listener.totalGrantedWork, "Granted Burst does not match") }, - { assertEquals(1020000, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, + { assertEquals(2073600.0, listener.totalRequestedWork, "Requested Burst does not match") }, + { assertEquals(1053600.0, listener.totalGrantedWork, "Granted Burst does not match") }, + { assertEquals(1020000.0, listener.totalOvercommittedWork, "Overcommissioned Burst does not match") }, { assertEquals(1200000, clock.millis()) } ) } -- cgit v1.2.3 From bb6066e1cecc55a50ac29da200bf3beba1ddd80b Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 25 Aug 2021 18:16:20 +0200 Subject: fix(capelin): Eliminate unnecessary double to long conversions This change eliminates the unnecessary conversions from double to long in the Capelin metric processing code. --- .../capelin/monitor/ExperimentMetricExporter.kt | 12 ++--- .../capelin/monitor/ExperimentMonitor.kt | 16 +++--- .../capelin/monitor/ParquetExperimentMonitor.kt | 24 ++++----- .../experiments/capelin/CapelinIntegrationTest.kt | 60 +++++++++++----------- .../org/opendc/web/runner/WebExperimentMonitor.kt | 24 ++++----- 5 files changed, 68 insertions(+), 68 deletions(-) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt index 16358817..be94593c 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt @@ -65,12 +65,12 @@ public class ExperimentMetricExporter( for ((id, hostMetric) in hostMetrics) { val lastHostMetric = lastHostMetrics.getOrDefault(id, hostMetricsSingleton) val host = hosts.getValue(id) - monitor.reportHostSlice( + monitor.reportHostData( clock.millis(), - (hostMetric.requestedBurst - lastHostMetric.requestedBurst).toLong(), - (hostMetric.grantedBurst - lastHostMetric.grantedBurst).toLong(), - (hostMetric.overcommissionedBurst - lastHostMetric.overcommissionedBurst).toLong(), - (hostMetric.interferedBurst - lastHostMetric.interferedBurst).toLong(), + hostMetric.requestedBurst - lastHostMetric.requestedBurst, + hostMetric.grantedBurst - lastHostMetric.grantedBurst, + hostMetric.overcommissionedBurst - lastHostMetric.overcommissionedBurst, + hostMetric.interferedBurst - lastHostMetric.interferedBurst, hostMetric.cpuUsage, hostMetric.cpuDemand, hostMetric.powerDraw, @@ -141,7 +141,7 @@ public class ExperimentMetricExporter( val hosts = metrics["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 val availableHosts = metrics["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - monitor.reportProvisionerMetrics( + monitor.reportServiceData( clock.millis(), hosts, availableHosts, diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt index 79af88fe..9a4aec35 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMonitor.kt @@ -44,23 +44,23 @@ public interface ExperimentMonitor : AutoCloseable { /** * This method is invoked for a host for each slice that is finishes. */ - public fun reportHostSlice( + public fun reportHostData( time: Long, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, + totalWork: Double, + grantedWork: Double, + overcommittedWork: Double, + interferedWork: Double, cpuUsage: Double, cpuDemand: Double, powerDraw: Double, - numberOfDeployedImages: Int, + instanceCount: Int, host: Host ) {} /** - * This method is invoked for a provisioner event. + * This method is invoked for reporting service data. */ - public fun reportProvisionerMetrics( + public fun reportServiceData( time: Long, totalHostCount: Int, availableHostCount: Int, diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt index d314c6f5..83351c41 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ParquetExperimentMonitor.kt @@ -57,16 +57,16 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: logger.debug { "Host ${host.uid} changed state $newState [$time]" } } - override fun reportHostSlice( + override fun reportHostData( time: Long, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, + totalWork: Double, + grantedWork: Double, + overcommittedWork: Double, + interferedWork: Double, cpuUsage: Double, cpuDemand: Double, powerDraw: Double, - numberOfDeployedImages: Int, + instanceCount: Int, host: Host ) { hostWriter.write( @@ -74,11 +74,11 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: time, 5 * 60 * 1000L, host, - numberOfDeployedImages, - requestedBurst.toLong(), - grantedBurst.toLong(), - overcommissionedBurst.toLong(), - interferedBurst.toLong(), + instanceCount, + totalWork.toLong(), + grantedWork.toLong(), + overcommittedWork.toLong(), + interferedWork.toLong(), cpuUsage, cpuDemand, powerDraw, @@ -87,7 +87,7 @@ public class ParquetExperimentMonitor(base: File, partition: String, bufferSize: ) } - override fun reportProvisionerMetrics( + override fun reportServiceData( time: Long, totalHostCount: Int, availableHostCount: Int, diff --git a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt index 9b98b329..8008c944 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/test/kotlin/org/opendc/experiments/capelin/CapelinIntegrationTest.kt @@ -120,10 +120,10 @@ class CapelinIntegrationTest { { assertEquals(0, monitorResults.runningVms, "All VMs should finish after a run") }, { assertEquals(0, monitorResults.unscheduledVms, "No VM should not be unscheduled") }, { assertEquals(0, monitorResults.queuedVms, "No VM should not be in the queue") }, - { assertEquals(220346369672, monitor.totalRequestedBurst) { "Incorrect requested burst" } }, - { assertEquals(206667809431, monitor.totalGrantedBurst) { "Incorrect granted burst" } }, - { assertEquals(1151611104, monitor.totalOvercommissionedBurst) { "Incorrect overcommitted burst" } }, - { assertEquals(0, monitor.totalInterferedBurst) { "Incorrect interfered burst" } } + { assertEquals(220346369753, monitor.totalWork) { "Incorrect requested burst" } }, + { assertEquals(206667809529, monitor.totalGrantedWork) { "Incorrect granted burst" } }, + { assertEquals(1151611104, monitor.totalOvercommittedWork) { "Incorrect overcommitted burst" } }, + { assertEquals(0, monitor.totalInterferedWork) { "Incorrect interfered burst" } } ) } @@ -160,10 +160,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(38051879542, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(34888186396, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(971668973, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, - { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } + { assertEquals(38051879552, monitor.totalWork) { "Total requested work incorrect" } }, + { assertEquals(34888186408, monitor.totalGrantedWork) { "Total granted work incorrect" } }, + { assertEquals(971668973, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, + { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } } ) } @@ -204,10 +204,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(38051879542, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(34888186396, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(971668973, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, - { assertEquals(13910799, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } + { assertEquals(38051879552, monitor.totalWork) { "Total requested work incorrect" } }, + { assertEquals(34888186408, monitor.totalGrantedWork) { "Total granted work incorrect" } }, + { assertEquals(971668973, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, + { assertEquals(13910814, monitor.totalInterferedWork) { "Total interfered work incorrect" } } ) } @@ -256,10 +256,10 @@ class CapelinIntegrationTest { // Note that these values have been verified beforehand assertAll( - { assertEquals(25412073100, monitor.totalRequestedBurst) { "Total requested work incorrect" } }, - { assertEquals(23695061847, monitor.totalGrantedBurst) { "Total granted work incorrect" } }, - { assertEquals(368502468, monitor.totalOvercommissionedBurst) { "Total overcommitted work incorrect" } }, - { assertEquals(0, monitor.totalInterferedBurst) { "Total interfered work incorrect" } } + { assertEquals(25412073109, monitor.totalWork) { "Total requested work incorrect" } }, + { assertEquals(23695061858, monitor.totalGrantedWork) { "Total granted work incorrect" } }, + { assertEquals(368502468, monitor.totalOvercommittedWork) { "Total overcommitted work incorrect" } }, + { assertEquals(0, monitor.totalInterferedWork) { "Total interfered work incorrect" } } ) } @@ -283,27 +283,27 @@ class CapelinIntegrationTest { } class TestExperimentReporter : ExperimentMonitor { - var totalRequestedBurst = 0L - var totalGrantedBurst = 0L - var totalOvercommissionedBurst = 0L - var totalInterferedBurst = 0L + var totalWork = 0L + var totalGrantedWork = 0L + var totalOvercommittedWork = 0L + var totalInterferedWork = 0L - override fun reportHostSlice( + override fun reportHostData( time: Long, - requestedBurst: Long, - grantedBurst: Long, - overcommissionedBurst: Long, - interferedBurst: Long, + totalWork: Double, + grantedWork: Double, + overcommittedWork: Double, + interferedWork: Double, cpuUsage: Double, cpuDemand: Double, powerDraw: Double, - numberOfDeployedImages: Int, + instanceCount: Int, host: Host, ) { - totalRequestedBurst += requestedBurst.toLong() - totalGrantedBurst += grantedBurst.toLong() - totalOvercommissionedBurst += overcommissionedBurst.toLong() - totalInterferedBurst += interferedBurst.toLong() + this.totalWork += totalWork.toLong() + totalGrantedWork += grantedWork.toLong() + totalOvercommittedWork += overcommittedWork.toLong() + totalInterferedWork += interferedWork.toLong() } override fun close() {} diff --git a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt index 140f067a..82e2a334 100644 --- a/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt +++ b/opendc-web/opendc-web-runner/src/main/kotlin/org/opendc/web/runner/WebExperimentMonitor.kt @@ -43,16 +43,16 @@ public class WebExperimentMonitor : ExperimentMonitor { logger.debug { "Host ${host.uid} changed state $newState [$time]" } } - override fun reportHostSlice( + override fun reportHostData( time: Long, - requestedBurst: Double, - grantedBurst: Double, - overcommissionedBurst: Double, - interferedBurst: Double, + totalWork: Double, + grantedWork: Double, + overcommittedWork: Double, + interferedWork: Double, cpuUsage: Double, cpuDemand: Double, powerDraw: Double, - numberOfDeployedImages: Int, + instanceCount: Int, host: Host, ) { processHostEvent( @@ -60,11 +60,11 @@ public class WebExperimentMonitor : ExperimentMonitor { time, 5 * 60 * 1000L, host, - numberOfDeployedImages, - requestedBurst, - grantedBurst, - overcommissionedBurst, - interferedBurst, + instanceCount, + totalWork.toLong(), + grantedWork.toLong(), + overcommittedWork.toLong(), + interferedWork.toLong(), cpuUsage, cpuDemand, powerDraw, @@ -120,7 +120,7 @@ public class WebExperimentMonitor : ExperimentMonitor { private var provisionerMetrics: AggregateProvisionerMetrics = AggregateProvisionerMetrics() - override fun reportProvisionerMetrics( + override fun reportServiceData( time: Long, totalHostCount: Int, availableHostCount: Int, -- cgit v1.2.3 From e5b79b18dab4f2874f3c5730b7e599dc74573c8d Mon Sep 17 00:00:00 2001 From: Fabian Mastenbroek Date: Wed, 25 Aug 2021 18:24:55 +0200 Subject: refactor(capelin): Simplify metric extraction for monitor This change updates the metric exporter logic in the Capelin module and reduces the number of allocations by looping directly over the collection of metrics. --- .../experiments/capelin/ExperimentHelpers.kt | 38 ++++++--- .../capelin/monitor/ExperimentMetricExporter.kt | 91 +++++++++++----------- 2 files changed, 74 insertions(+), 55 deletions(-) diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 4cffb8d3..7f428b2a 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -24,6 +24,7 @@ package org.opendc.experiments.capelin import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel @@ -208,24 +209,41 @@ class ComputeMetrics { var runningVms: Int = 0 var unscheduledVms: Int = 0 var finishedVms: Int = 0 + var hosts: Int = 0 + var availableHosts = 0 } /** * Collect the metrics of the compute service. */ fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics { - val metrics = metricProducer.collectAllMetrics().associateBy { it.name } + return extractComputeMetrics(metricProducer.collectAllMetrics()) +} + +/** + * Extract an [ComputeMetrics] object from the specified list of metric data. + */ +internal fun extractComputeMetrics(metrics: Collection): ComputeMetrics { val res = ComputeMetrics() - try { - // Hack to extract metrics from OpenTelemetry SDK - res.submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - res.queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - res.unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - res.runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - res.finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - } catch (cause: Throwable) { - logger.warn(cause) { "Failed to collect metrics" } + for (metric in metrics) { + val points = metric.longSumData.points + + if (points.isEmpty()) { + continue + } + + val value = points.first().value.toInt() + when (metric.name) { + "servers.submitted" -> res.submittedVms = value + "servers.waiting" -> res.queuedVms = value + "servers.unscheduled" -> res.unscheduledVms = value + "servers.active" -> res.runningVms = value + "servers.finished" -> res.finishedVms = value + "hosts.total" -> res.hosts = value + "hosts.available" -> res.availableHosts = value + } } + return res } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt index be94593c..e9c817de 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt @@ -27,6 +27,7 @@ import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricExporter import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import org.opendc.compute.service.driver.Host +import org.opendc.experiments.capelin.extractComputeMetrics import java.time.Clock /** @@ -37,44 +38,50 @@ public class ExperimentMetricExporter( private val clock: Clock, private val hosts: Map ) : MetricExporter { - private val hostKey = ResourceAttributes.HOST_ID override fun export(metrics: Collection): CompletableResultCode { - val metricsByName = metrics.associateBy { it.name } - reportHostMetrics(metricsByName) - reportProvisionerMetrics(metricsByName) - return CompletableResultCode.ofSuccess() + return try { + reportHostMetrics(metrics) + reportProvisionerMetrics(metrics) + CompletableResultCode.ofSuccess() + } catch (e: Throwable) { + CompletableResultCode.ofFailure() + } } private var lastHostMetrics: Map = emptyMap() private val hostMetricsSingleton = HostMetrics() - private fun reportHostMetrics(metrics: Map) { + private fun reportHostMetrics(metrics: Collection) { val hostMetrics = mutableMapOf() hosts.mapValuesTo(hostMetrics) { HostMetrics() } - mapDoubleSummary(metrics["cpu.demand"], hostMetrics) { m, v -> m.cpuDemand = v } - mapDoubleSummary(metrics["cpu.usage"], hostMetrics) { m, v -> m.cpuUsage = v } - mapDoubleGauge(metrics["power.usage"], hostMetrics) { m, v -> m.powerDraw = v } - mapDoubleSum(metrics["cpu.work.total"], hostMetrics) { m, v -> m.requestedBurst = v } - mapDoubleSum(metrics["cpu.work.granted"], hostMetrics) { m, v -> m.grantedBurst = v } - mapDoubleSum(metrics["cpu.work.overcommit"], hostMetrics) { m, v -> m.overcommissionedBurst = v } - mapDoubleSum(metrics["cpu.work.interference"], hostMetrics) { m, v -> m.interferedBurst = v } - mapLongSum(metrics["guests.active"], hostMetrics) { m, v -> m.numberOfDeployedImages = v.toInt() } + for (metric in metrics) { + when (metric.name) { + "cpu.demand" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuDemand = v } + "cpu.usage" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuUsage = v } + "power.usage" -> mapDoubleGauge(metric, hostMetrics) { m, v -> m.powerDraw = v } + "cpu.work.total" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.totalWork = v } + "cpu.work.granted" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.grantedWork = v } + "cpu.work.overcommit" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.overcommittedWork = v } + "cpu.work.interference" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.interferedWork = v } + "guests.active" -> mapLongSum(metric, hostMetrics) { m, v -> m.instanceCount = v.toInt() } + } + } for ((id, hostMetric) in hostMetrics) { val lastHostMetric = lastHostMetrics.getOrDefault(id, hostMetricsSingleton) val host = hosts.getValue(id) monitor.reportHostData( clock.millis(), - hostMetric.requestedBurst - lastHostMetric.requestedBurst, - hostMetric.grantedBurst - lastHostMetric.grantedBurst, - hostMetric.overcommissionedBurst - lastHostMetric.overcommissionedBurst, - hostMetric.interferedBurst - lastHostMetric.interferedBurst, + hostMetric.totalWork - lastHostMetric.totalWork, + hostMetric.grantedWork - lastHostMetric.grantedWork, + hostMetric.overcommittedWork - lastHostMetric.overcommittedWork, + hostMetric.interferedWork - lastHostMetric.interferedWork, hostMetric.cpuUsage, hostMetric.cpuDemand, hostMetric.powerDraw, - hostMetric.numberOfDeployedImages, + hostMetric.instanceCount, host ) } @@ -82,10 +89,10 @@ public class ExperimentMetricExporter( lastHostMetrics = hostMetrics } - private fun mapDoubleSummary(data: MetricData?, hostMetrics: MutableMap, block: (HostMetrics, Double) -> Unit) { - val points = data?.doubleSummaryData?.points ?: emptyList() + private fun mapDoubleSummary(data: MetricData, hostMetrics: MutableMap, block: (HostMetrics, Double) -> Unit) { + val points = data.doubleSummaryData?.points ?: emptyList() for (point in points) { - val uid = point.attributes[hostKey] + val uid = point.attributes[ResourceAttributes.HOST_ID] val hostMetric = hostMetrics[uid] if (hostMetric != null) { @@ -99,7 +106,7 @@ public class ExperimentMetricExporter( private fun mapDoubleGauge(data: MetricData?, hostMetrics: MutableMap, block: (HostMetrics, Double) -> Unit) { val points = data?.doubleGaugeData?.points ?: emptyList() for (point in points) { - val uid = point.attributes[hostKey] + val uid = point.attributes[ResourceAttributes.HOST_ID] val hostMetric = hostMetrics[uid] if (hostMetric != null) { @@ -111,7 +118,7 @@ public class ExperimentMetricExporter( private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap, block: (HostMetrics, Long) -> Unit) { val points = data?.longSumData?.points ?: emptyList() for (point in points) { - val uid = point.attributes[hostKey] + val uid = point.attributes[ResourceAttributes.HOST_ID] val hostMetric = hostMetrics[uid] if (hostMetric != null) { @@ -123,7 +130,7 @@ public class ExperimentMetricExporter( private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap, block: (HostMetrics, Double) -> Unit) { val points = data?.doubleSumData?.points ?: emptyList() for (point in points) { - val uid = point.attributes[hostKey] + val uid = point.attributes[ResourceAttributes.HOST_ID] val hostMetric = hostMetrics[uid] if (hostMetric != null) { @@ -132,35 +139,29 @@ public class ExperimentMetricExporter( } } - private fun reportProvisionerMetrics(metrics: Map) { - val submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val hosts = metrics["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val availableHosts = metrics["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + private fun reportProvisionerMetrics(metrics: Collection) { + val res = extractComputeMetrics(metrics) monitor.reportServiceData( clock.millis(), - hosts, - availableHosts, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms + res.hosts, + res.availableHosts, + res.submittedVms, + res.runningVms, + res.finishedVms, + res.queuedVms, + res.unscheduledVms ) } private class HostMetrics { - var requestedBurst: Double = 0.0 - var grantedBurst: Double = 0.0 - var overcommissionedBurst: Double = 0.0 - var interferedBurst: Double = 0.0 + var totalWork: Double = 0.0 + var grantedWork: Double = 0.0 + var overcommittedWork: Double = 0.0 + var interferedWork: Double = 0.0 var cpuUsage: Double = 0.0 var cpuDemand: Double = 0.0 - var numberOfDeployedImages: Int = 0 + var instanceCount: Int = 0 var powerDraw: Double = 0.0 } -- cgit v1.2.3