diff options
Diffstat (limited to 'opendc-experiments')
2 files changed, 74 insertions, 55 deletions
diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt index 4cffb8d3..7f428b2a 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/ExperimentHelpers.kt @@ -24,6 +24,7 @@ package org.opendc.experiments.capelin import io.opentelemetry.api.metrics.MeterProvider import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel @@ -208,24 +209,41 @@ class ComputeMetrics { var runningVms: Int = 0 var unscheduledVms: Int = 0 var finishedVms: Int = 0 + var hosts: Int = 0 + var availableHosts = 0 } /** * Collect the metrics of the compute service. */ fun collectMetrics(metricProducer: MetricProducer): ComputeMetrics { - val metrics = metricProducer.collectAllMetrics().associateBy { it.name } + return extractComputeMetrics(metricProducer.collectAllMetrics()) +} + +/** + * Extract an [ComputeMetrics] object from the specified list of metric data. + */ +internal fun extractComputeMetrics(metrics: Collection<MetricData>): ComputeMetrics { val res = ComputeMetrics() - try { - // Hack to extract metrics from OpenTelemetry SDK - res.submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - res.queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - res.unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - res.runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - res.finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - } catch (cause: Throwable) { - logger.warn(cause) { "Failed to collect metrics" } + for (metric in metrics) { + val points = metric.longSumData.points + + if (points.isEmpty()) { + continue + } + + val value = points.first().value.toInt() + when (metric.name) { + "servers.submitted" -> res.submittedVms = value + "servers.waiting" -> res.queuedVms = value + "servers.unscheduled" -> res.unscheduledVms = value + "servers.active" -> res.runningVms = value + "servers.finished" -> res.finishedVms = value + "hosts.total" -> res.hosts = value + "hosts.available" -> res.availableHosts = value + } } + return res } diff --git a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt index be94593c..e9c817de 100644 --- a/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt +++ b/opendc-experiments/opendc-experiments-capelin/src/main/kotlin/org/opendc/experiments/capelin/monitor/ExperimentMetricExporter.kt @@ -27,6 +27,7 @@ import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricExporter import io.opentelemetry.semconv.resource.attributes.ResourceAttributes import org.opendc.compute.service.driver.Host +import org.opendc.experiments.capelin.extractComputeMetrics import java.time.Clock /** @@ -37,44 +38,50 @@ public class ExperimentMetricExporter( private val clock: Clock, private val hosts: Map<String, Host> ) : MetricExporter { - private val hostKey = ResourceAttributes.HOST_ID override fun export(metrics: Collection<MetricData>): CompletableResultCode { - val metricsByName = metrics.associateBy { it.name } - reportHostMetrics(metricsByName) - reportProvisionerMetrics(metricsByName) - return CompletableResultCode.ofSuccess() + return try { + reportHostMetrics(metrics) + reportProvisionerMetrics(metrics) + CompletableResultCode.ofSuccess() + } catch (e: Throwable) { + CompletableResultCode.ofFailure() + } } private var lastHostMetrics: Map<String, HostMetrics> = emptyMap() private val hostMetricsSingleton = HostMetrics() - private fun reportHostMetrics(metrics: Map<String, MetricData>) { + private fun reportHostMetrics(metrics: Collection<MetricData>) { val hostMetrics = mutableMapOf<String, HostMetrics>() hosts.mapValuesTo(hostMetrics) { HostMetrics() } - mapDoubleSummary(metrics["cpu.demand"], hostMetrics) { m, v -> m.cpuDemand = v } - mapDoubleSummary(metrics["cpu.usage"], hostMetrics) { m, v -> m.cpuUsage = v } - mapDoubleGauge(metrics["power.usage"], hostMetrics) { m, v -> m.powerDraw = v } - mapDoubleSum(metrics["cpu.work.total"], hostMetrics) { m, v -> m.requestedBurst = v } - mapDoubleSum(metrics["cpu.work.granted"], hostMetrics) { m, v -> m.grantedBurst = v } - mapDoubleSum(metrics["cpu.work.overcommit"], hostMetrics) { m, v -> m.overcommissionedBurst = v } - mapDoubleSum(metrics["cpu.work.interference"], hostMetrics) { m, v -> m.interferedBurst = v } - mapLongSum(metrics["guests.active"], hostMetrics) { m, v -> m.numberOfDeployedImages = v.toInt() } + for (metric in metrics) { + when (metric.name) { + "cpu.demand" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuDemand = v } + "cpu.usage" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuUsage = v } + "power.usage" -> mapDoubleGauge(metric, hostMetrics) { m, v -> m.powerDraw = v } + "cpu.work.total" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.totalWork = v } + "cpu.work.granted" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.grantedWork = v } + "cpu.work.overcommit" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.overcommittedWork = v } + "cpu.work.interference" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.interferedWork = v } + "guests.active" -> mapLongSum(metric, hostMetrics) { m, v -> m.instanceCount = v.toInt() } + } + } for ((id, hostMetric) in hostMetrics) { val lastHostMetric = lastHostMetrics.getOrDefault(id, hostMetricsSingleton) val host = hosts.getValue(id) monitor.reportHostData( clock.millis(), - hostMetric.requestedBurst - lastHostMetric.requestedBurst, - hostMetric.grantedBurst - lastHostMetric.grantedBurst, - hostMetric.overcommissionedBurst - lastHostMetric.overcommissionedBurst, - hostMetric.interferedBurst - lastHostMetric.interferedBurst, + hostMetric.totalWork - lastHostMetric.totalWork, + hostMetric.grantedWork - lastHostMetric.grantedWork, + hostMetric.overcommittedWork - lastHostMetric.overcommittedWork, + hostMetric.interferedWork - lastHostMetric.interferedWork, hostMetric.cpuUsage, hostMetric.cpuDemand, hostMetric.powerDraw, - hostMetric.numberOfDeployedImages, + hostMetric.instanceCount, host ) } @@ -82,10 +89,10 @@ public class ExperimentMetricExporter( lastHostMetrics = hostMetrics } - private fun mapDoubleSummary(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) { - val points = data?.doubleSummaryData?.points ?: emptyList() + private fun mapDoubleSummary(data: MetricData, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) { + val points = data.doubleSummaryData?.points ?: emptyList() for (point in points) { - val uid = point.attributes[hostKey] + val uid = point.attributes[ResourceAttributes.HOST_ID] val hostMetric = hostMetrics[uid] if (hostMetric != null) { @@ -99,7 +106,7 @@ public class ExperimentMetricExporter( private fun mapDoubleGauge(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) { val points = data?.doubleGaugeData?.points ?: emptyList() for (point in points) { - val uid = point.attributes[hostKey] + val uid = point.attributes[ResourceAttributes.HOST_ID] val hostMetric = hostMetrics[uid] if (hostMetric != null) { @@ -111,7 +118,7 @@ public class ExperimentMetricExporter( private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Long) -> Unit) { val points = data?.longSumData?.points ?: emptyList() for (point in points) { - val uid = point.attributes[hostKey] + val uid = point.attributes[ResourceAttributes.HOST_ID] val hostMetric = hostMetrics[uid] if (hostMetric != null) { @@ -123,7 +130,7 @@ public class ExperimentMetricExporter( private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HostMetrics>, block: (HostMetrics, Double) -> Unit) { val points = data?.doubleSumData?.points ?: emptyList() for (point in points) { - val uid = point.attributes[hostKey] + val uid = point.attributes[ResourceAttributes.HOST_ID] val hostMetric = hostMetrics[uid] if (hostMetric != null) { @@ -132,35 +139,29 @@ public class ExperimentMetricExporter( } } - private fun reportProvisionerMetrics(metrics: Map<String, MetricData>) { - val submittedVms = metrics["servers.submitted"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val queuedVms = metrics["servers.waiting"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val unscheduledVms = metrics["servers.unscheduled"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val runningVms = metrics["servers.active"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val finishedVms = metrics["servers.finished"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val hosts = metrics["hosts.total"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 - val availableHosts = metrics["hosts.available"]?.longSumData?.points?.last()?.value?.toInt() ?: 0 + private fun reportProvisionerMetrics(metrics: Collection<MetricData>) { + val res = extractComputeMetrics(metrics) monitor.reportServiceData( clock.millis(), - hosts, - availableHosts, - submittedVms, - runningVms, - finishedVms, - queuedVms, - unscheduledVms + res.hosts, + res.availableHosts, + res.submittedVms, + res.runningVms, + res.finishedVms, + res.queuedVms, + res.unscheduledVms ) } private class HostMetrics { - var requestedBurst: Double = 0.0 - var grantedBurst: Double = 0.0 - var overcommissionedBurst: Double = 0.0 - var interferedBurst: Double = 0.0 + var totalWork: Double = 0.0 + var grantedWork: Double = 0.0 + var overcommittedWork: Double = 0.0 + var interferedWork: Double = 0.0 var cpuUsage: Double = 0.0 var cpuDemand: Double = 0.0 - var numberOfDeployedImages: Int = 0 + var instanceCount: Int = 0 var powerDraw: Double = 0.0 } |
