summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-08-25 18:24:55 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-08-25 18:24:55 +0200
commite5b79b18dab4f2874f3c5730b7e599dc74573c8d (patch)
tree8e9b054a770b3a048b5cfb44e3f6bb4dff57315e
parentbb6066e1cecc55a50ac29da200bf3beba1ddd80b (diff)
refactor(capelin): Simplify metric extraction for monitor
This change updates the metric exporter logic in the Capelin module and reduces the number of allocations by looping directly over the collection of metrics.
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt38
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt91
2 files changed, 74 insertions, 55 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 4cffb8d3..7f428b2a 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -24,6 +24,7 @@ package org.opendc.experiments.capelin
import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.data.MetricData
import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
@@ -208,24 +209,41 @@ class ComputeMetrics {
var runningVms: Int = 0
var unscheduledVms: Int = 0
var finishedVms: Int = 0
+ var hosts: Int = 0
+ var availableHosts = 0
}
/**
* Collect the metrics of the compute service.
*/
fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics {
- val metrics = metricProducer.collectAllMetrics().associateBy { it.name }
+ return extractComputeMetrics(metricProducer.collectAllMetrics())
+}
+
+/**
+ * Extract an [ComputeMetrics] object from the specified list of metric data.
+ */
+internal fun extractComputeMetrics(metrics: Collection<MetricData>): ComputeMetrics {
val res = ComputeMetrics()
- try {
- // Hack to extract metrics from OpenTelemetry SDK
- res.submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- res.queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- res.unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- res.runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- res.finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- } catch (cause: Throwable) {
- logger.warn(cause) { "Failed to collect metrics" }
+ for (metric in metrics) {
+ val points = metric.longSumData.points
+
+ if (points.isEmpty()) {
+ continue
+ }
+
+ val value = points.first().value.toInt()
+ when (metric.name) {
+ "servers.submitted" -> res.submittedVms = value
+ "servers.waiting" -> res.queuedVms = value
+ "servers.unscheduled" -> res.unscheduledVms = value
+ "servers.active" -> res.runningVms = value
+ "servers.finished" -> res.finishedVms = value
+ "hosts.total" -> res.hosts = value
+ "hosts.available" -> res.availableHosts = value
+ }
}
+
return res
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
index be94593c..e9c817de 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
@@ -27,6 +27,7 @@ import io.opentelemetry.sdk.metrics.data.MetricData
import io.opentelemetry.sdk.metrics.export.MetricExporter
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import org.opendc.compute.service.driver.Host
+import org.opendc.experiments.capelin.extractComputeMetrics
import java.time.Clock
/**
@@ -37,44 +38,50 @@ public class ExperimentMetricExporter(
private val clock: Clock,
private val hosts: Map<String, Host>
) : MetricExporter {
- private val hostKey = ResourceAttributes.HOST_ID
override fun export(metrics: Collection<MetricData>): CompletableResultCode {
- val metricsByName = metrics.associateBy { it.name }
- reportHostMetrics(metricsByName)
- reportProvisionerMetrics(metricsByName)
- return CompletableResultCode.ofSuccess()
+ return try {
+ reportHostMetrics(metrics)
+ reportProvisionerMetrics(metrics)
+ CompletableResultCode.ofSuccess()
+ } catch (e: Throwable) {
+ CompletableResultCode.ofFailure()
+ }
}
private var lastHostMetrics: Map<String, HostMetrics> = emptyMap()
private val hostMetricsSingleton = HostMetrics()
- private fun reportHostMetrics(metrics: Map<String, MetricData>) {
+ private fun reportHostMetrics(metrics: Collection<MetricData>) {
val hostMetrics = mutableMapOf<String, HostMetrics>()
hosts.mapValuesTo(hostMetrics) { HostMetrics() }
- mapDoubleSummary(metrics["cpu.demand"], hostMetrics) { m, v -> m.cpuDemand = v }
- mapDoubleSummary(metrics["cpu.usage"], hostMetrics) { m, v -> m.cpuUsage = v }
- mapDoubleGauge(metrics["power.usage"], hostMetrics) { m, v -> m.powerDraw = v }
- mapDoubleSum(metrics["cpu.work.total"], hostMetrics) { m, v -> m.requestedBurst = v }
- mapDoubleSum(metrics["cpu.work.granted"], hostMetrics) { m, v -> m.grantedBurst = v }
- mapDoubleSum(metrics["cpu.work.overcommit"], hostMetrics) { m, v -> m.overcommissionedBurst = v }
- mapDoubleSum(metrics["cpu.work.interference"], hostMetrics) { m, v -> m.interferedBurst = v }
- mapLongSum(metrics["guests.active"], hostMetrics) { m, v -> m.numberOfDeployedImages = v.toInt() }
+ for (metric in metrics) {
+ when (metric.name) {
+ "cpu.demand" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuDemand = v }
+ "cpu.usage" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuUsage = v }
+ "power.usage" -> mapDoubleGauge(metric, hostMetrics) { m, v -> m.powerDraw = v }
+ "cpu.work.total" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.totalWork = v }
+ "cpu.work.granted" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.grantedWork = v }
+ "cpu.work.overcommit" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.overcommittedWork = v }
+ "cpu.work.interference" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.interferedWork = v }
+ "guests.active" -> mapLongSum(metric, hostMetrics) { m, v -> m.instanceCount = v.toInt() }
+ }
+ }
for ((id, hostMetric) in hostMetrics) {
val lastHostMetric = lastHostMetrics.getOrDefault(id, hostMetricsSingleton)
val host = hosts.getValue(id)
monitor.reportHostData(
clock.millis(),
- hostMetric.requestedBurst - lastHostMetric.requestedBurst,
- hostMetric.grantedBurst - lastHostMetric.grantedBurst,
- hostMetric.overcommissionedBurst - lastHostMetric.overcommissionedBurst,
- hostMetric.interferedBurst - lastHostMetric.interferedBurst,
+ hostMetric.totalWork - lastHostMetric.totalWork,
+ hostMetric.grantedWork - lastHostMetric.grantedWork,
+ hostMetric.overcommittedWork - lastHostMetric.overcommittedWork,
+ hostMetric.interferedWork - lastHostMetric.interferedWork,
hostMetric.cpuUsage,
hostMetric.cpuDemand,
hostMetric.powerDraw,
- hostMetric.numberOfDeployedImages,
+ hostMetric.instanceCount,
host
)
}
@@ -82,10 +89,10 @@ public class ExperimentMetricExporter(
lastHostMetrics = hostMetrics
}
- private fun mapDoubleSummary(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
- val points = data?.doubleSummaryData?.points ?: emptyList()
+ private fun mapDoubleSummary(data: MetricData, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
+ val points = data.doubleSummaryData?.points ?: emptyList()
for (point in points) {
- val uid = point.attributes[hostKey]
+ val uid = point.attributes[ResourceAttributes.HOST_ID]
val hostMetric = hostMetrics[uid]
if (hostMetric != null) {
@@ -99,7 +106,7 @@ public class ExperimentMetricExporter(
private fun mapDoubleGauge(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
val points = data?.doubleGaugeData?.points ?: emptyList()
for (point in points) {
- val uid = point.attributes[hostKey]
+ val uid = point.attributes[ResourceAttributes.HOST_ID]
val hostMetric = hostMetrics[uid]
if (hostMetric != null) {
@@ -111,7 +118,7 @@ public class ExperimentMetricExporter(
private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Long) -> Unit) {
val points = data?.longSumData?.points ?: emptyList()
for (point in points) {
- val uid = point.attributes[hostKey]
+ val uid = point.attributes[ResourceAttributes.HOST_ID]
val hostMetric = hostMetrics[uid]
if (hostMetric != null) {
@@ -123,7 +130,7 @@ public class ExperimentMetricExporter(
private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
val points = data?.doubleSumData?.points ?: emptyList()
for (point in points) {
- val uid = point.attributes[hostKey]
+ val uid = point.attributes[ResourceAttributes.HOST_ID]
val hostMetric = hostMetrics[uid]
if (hostMetric != null) {
@@ -132,35 +139,29 @@ public class ExperimentMetricExporter(
}
}
- private fun reportProvisionerMetrics(metrics: Map<String, MetricData>) {
- val submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val hosts = metrics["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val availableHosts = metrics["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ private fun reportProvisionerMetrics(metrics: Collection<MetricData>) {
+ val res = extractComputeMetrics(metrics)
monitor.reportServiceData(
clock.millis(),
- hosts,
- availableHosts,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
+ res.hosts,
+ res.availableHosts,
+ res.submittedVms,
+ res.runningVms,
+ res.finishedVms,
+ res.queuedVms,
+ res.unscheduledVms
)
}
private class HostMetrics {
- var requestedBurst: Double = 0.0
- var grantedBurst: Double = 0.0
- var overcommissionedBurst: Double = 0.0
- var interferedBurst: Double = 0.0
+ var totalWork: Double = 0.0
+ var grantedWork: Double = 0.0
+ var overcommittedWork: Double = 0.0
+ var interferedWork: Double = 0.0
var cpuUsage: Double = 0.0
var cpuDemand: Double = 0.0
- var numberOfDeployedImages: Int = 0
+ var instanceCount: Int = 0
var powerDraw: Double = 0.0
}