summaryrefslogtreecommitdiff
path: root/opendc-experiments/opendc-experiments-capelin/src
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-08-25 18:24:55 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-08-25 18:24:55 +0200
commite5b79b18dab4f2874f3c5730b7e599dc74573c8d (patch)
tree8e9b054a770b3a048b5cfb44e3f6bb4dff57315e /opendc-experiments/opendc-experiments-capelin/src
parentbb6066e1cecc55a50ac29da200bf3beba1ddd80b (diff)
refactor(capelin): Simplify metric extraction for monitor
This change updates the metric exporter logic in the Capelin module and reduces the number of allocations by looping directly over the collection of metrics.
Diffstat (limited to 'opendc-experiments/opendc-experiments-capelin/src')
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt38
-rw-r--r--opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt91
2 files changed, 74 insertions, 55 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
index 4cffb8d3..7f428b2a 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt
@@ -24,6 +24,7 @@ package org.opendc.experiments.capelin
import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.sdk.metrics.SdkMeterProvider
+import io.opentelemetry.sdk.metrics.data.MetricData
import io.opentelemetry.sdk.metrics.export.MetricProducer
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
@@ -208,24 +209,41 @@ class ComputeMetrics {
var runningVms: Int = 0
var unscheduledVms: Int = 0
var finishedVms: Int = 0
+ var hosts: Int = 0
+ var availableHosts = 0
}
/**
* Collect the metrics of the compute service.
*/
fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics {
- val metrics = metricProducer.collectAllMetrics().associateBy { it.name }
+ return extractComputeMetrics(metricProducer.collectAllMetrics())
+}
+
+/**
+ * Extract an [ComputeMetrics] object from the specified list of metric data.
+ */
+internal fun extractComputeMetrics(metrics: Collection<MetricData>): ComputeMetrics {
val res = ComputeMetrics()
- try {
- // Hack to extract metrics from OpenTelemetry SDK
- res.submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- res.queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- res.unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- res.runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- res.finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- } catch (cause: Throwable) {
- logger.warn(cause) { "Failed to collect metrics" }
+ for (metric in metrics) {
+ val points = metric.longSumData.points
+
+ if (points.isEmpty()) {
+ continue
+ }
+
+ val value = points.first().value.toInt()
+ when (metric.name) {
+ "servers.submitted" -> res.submittedVms = value
+ "servers.waiting" -> res.queuedVms = value
+ "servers.unscheduled" -> res.unscheduledVms = value
+ "servers.active" -> res.runningVms = value
+ "servers.finished" -> res.finishedVms = value
+ "hosts.total" -> res.hosts = value
+ "hosts.available" -> res.availableHosts = value
+ }
}
+
return res
}
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
index be94593c..e9c817de 100644
--- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
+++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt
@@ -27,6 +27,7 @@ import io.opentelemetry.sdk.metrics.data.MetricData
import io.opentelemetry.sdk.metrics.export.MetricExporter
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import org.opendc.compute.service.driver.Host
+import org.opendc.experiments.capelin.extractComputeMetrics
import java.time.Clock
/**
@@ -37,44 +38,50 @@ public class ExperimentMetricExporter(
private val clock: Clock,
private val hosts: Map<String, Host>
) : MetricExporter {
- private val hostKey = ResourceAttributes.HOST_ID
override fun export(metrics: Collection<MetricData>): CompletableResultCode {
- val metricsByName = metrics.associateBy { it.name }
- reportHostMetrics(metricsByName)
- reportProvisionerMetrics(metricsByName)
- return CompletableResultCode.ofSuccess()
+ return try {
+ reportHostMetrics(metrics)
+ reportProvisionerMetrics(metrics)
+ CompletableResultCode.ofSuccess()
+ } catch (e: Throwable) {
+ CompletableResultCode.ofFailure()
+ }
}
private var lastHostMetrics: Map<String, HostMetrics> = emptyMap()
private val hostMetricsSingleton = HostMetrics()
- private fun reportHostMetrics(metrics: Map<String, MetricData>) {
+ private fun reportHostMetrics(metrics: Collection<MetricData>) {
val hostMetrics = mutableMapOf<String, HostMetrics>()
hosts.mapValuesTo(hostMetrics) { HostMetrics() }
- mapDoubleSummary(metrics["cpu.demand"], hostMetrics) { m, v -> m.cpuDemand = v }
- mapDoubleSummary(metrics["cpu.usage"], hostMetrics) { m, v -> m.cpuUsage = v }
- mapDoubleGauge(metrics["power.usage"], hostMetrics) { m, v -> m.powerDraw = v }
- mapDoubleSum(metrics["cpu.work.total"], hostMetrics) { m, v -> m.requestedBurst = v }
- mapDoubleSum(metrics["cpu.work.granted"], hostMetrics) { m, v -> m.grantedBurst = v }
- mapDoubleSum(metrics["cpu.work.overcommit"], hostMetrics) { m, v -> m.overcommissionedBurst = v }
- mapDoubleSum(metrics["cpu.work.interference"], hostMetrics) { m, v -> m.interferedBurst = v }
- mapLongSum(metrics["guests.active"], hostMetrics) { m, v -> m.numberOfDeployedImages = v.toInt() }
+ for (metric in metrics) {
+ when (metric.name) {
+ "cpu.demand" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuDemand = v }
+ "cpu.usage" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuUsage = v }
+ "power.usage" -> mapDoubleGauge(metric, hostMetrics) { m, v -> m.powerDraw = v }
+ "cpu.work.total" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.totalWork = v }
+ "cpu.work.granted" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.grantedWork = v }
+ "cpu.work.overcommit" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.overcommittedWork = v }
+ "cpu.work.interference" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.interferedWork = v }
+ "guests.active" -> mapLongSum(metric, hostMetrics) { m, v -> m.instanceCount = v.toInt() }
+ }
+ }
for ((id, hostMetric) in hostMetrics) {
val lastHostMetric = lastHostMetrics.getOrDefault(id, hostMetricsSingleton)
val host = hosts.getValue(id)
monitor.reportHostData(
clock.millis(),
- hostMetric.requestedBurst - lastHostMetric.requestedBurst,
- hostMetric.grantedBurst - lastHostMetric.grantedBurst,
- hostMetric.overcommissionedBurst - lastHostMetric.overcommissionedBurst,
- hostMetric.interferedBurst - lastHostMetric.interferedBurst,
+ hostMetric.totalWork - lastHostMetric.totalWork,
+ hostMetric.grantedWork - lastHostMetric.grantedWork,
+ hostMetric.overcommittedWork - lastHostMetric.overcommittedWork,
+ hostMetric.interferedWork - lastHostMetric.interferedWork,
hostMetric.cpuUsage,
hostMetric.cpuDemand,
hostMetric.powerDraw,
- hostMetric.numberOfDeployedImages,
+ hostMetric.instanceCount,
host
)
}
@@ -82,10 +89,10 @@ public class ExperimentMetricExporter(
lastHostMetrics = hostMetrics
}
- private fun mapDoubleSummary(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
- val points = data?.doubleSummaryData?.points ?: emptyList()
+ private fun mapDoubleSummary(data: MetricData, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
+ val points = data.doubleSummaryData?.points ?: emptyList()
for (point in points) {
- val uid = point.attributes[hostKey]
+ val uid = point.attributes[ResourceAttributes.HOST_ID]
val hostMetric = hostMetrics[uid]
if (hostMetric != null) {
@@ -99,7 +106,7 @@ public class ExperimentMetricExporter(
private fun mapDoubleGauge(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
val points = data?.doubleGaugeData?.points ?: emptyList()
for (point in points) {
- val uid = point.attributes[hostKey]
+ val uid = point.attributes[ResourceAttributes.HOST_ID]
val hostMetric = hostMetrics[uid]
if (hostMetric != null) {
@@ -111,7 +118,7 @@ public class ExperimentMetricExporter(
private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Long) -> Unit) {
val points = data?.longSumData?.points ?: emptyList()
for (point in points) {
- val uid = point.attributes[hostKey]
+ val uid = point.attributes[ResourceAttributes.HOST_ID]
val hostMetric = hostMetrics[uid]
if (hostMetric != null) {
@@ -123,7 +130,7 @@ public class ExperimentMetricExporter(
private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) {
val points = data?.doubleSumData?.points ?: emptyList()
for (point in points) {
- val uid = point.attributes[hostKey]
+ val uid = point.attributes[ResourceAttributes.HOST_ID]
val hostMetric = hostMetrics[uid]
if (hostMetric != null) {
@@ -132,35 +139,29 @@ public class ExperimentMetricExporter(
}
}
- private fun reportProvisionerMetrics(metrics: Map<String, MetricData>) {
- val submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val hosts = metrics["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
- val availableHosts = metrics["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0
+ private fun reportProvisionerMetrics(metrics: Collection<MetricData>) {
+ val res = extractComputeMetrics(metrics)
monitor.reportServiceData(
clock.millis(),
- hosts,
- availableHosts,
- submittedVms,
- runningVms,
- finishedVms,
- queuedVms,
- unscheduledVms
+ res.hosts,
+ res.availableHosts,
+ res.submittedVms,
+ res.runningVms,
+ res.finishedVms,
+ res.queuedVms,
+ res.unscheduledVms
)
}
private class HostMetrics {
- var requestedBurst: Double = 0.0
- var grantedBurst: Double = 0.0
- var overcommissionedBurst: Double = 0.0
- var interferedBurst: Double = 0.0
+ var totalWork: Double = 0.0
+ var grantedWork: Double = 0.0
+ var overcommittedWork: Double = 0.0
+ var interferedWork: Double = 0.0
var cpuUsage: Double = 0.0
var cpuDemand: Double = 0.0
- var numberOfDeployedImages: Int = 0
+ var instanceCount: Int = 0
var powerDraw: Double = 0.0
}