summaryrefslogtreecommitdiff
path: root/opendc-telemetry/opendc-telemetry-compute
diff options
context:
space:
mode:
authorFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-07 17:30:46 +0200
committerFabian Mastenbroek <mail.fabianm@gmail.com>2021-09-17 16:52:29 +0200
commit0d8bccc68705d036fbf60f312d9c34ca4392c6b2 (patch)
treefaa50b8bf29976531e2ba757269ceb746195737d /opendc-telemetry/opendc-telemetry-compute
parent8d899e29dbd757f6df320212d6e0d77ce8216ab9 (diff)
refactor(telemetry): Standardize SimHost metrics
This change standardizes the metrics emitted by SimHost instances and their guests based on the OpenTelemetry semantic conventions. We now also report CPU time as opposed to CPU work as this metric is more commonly used.
Diffstat (limited to 'opendc-telemetry/opendc-telemetry-compute')
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt448
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt243
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt55
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt33
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt19
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt18
6 files changed, 511 insertions, 305 deletions
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
new file mode 100644
index 00000000..e9449634
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
@@ -0,0 +1,448 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute
+
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
+import io.opentelemetry.sdk.metrics.data.MetricData
+import io.opentelemetry.sdk.metrics.data.PointData
+import io.opentelemetry.sdk.resources.Resource
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import org.opendc.telemetry.compute.table.*
+import java.time.Instant
+import kotlin.math.roundToLong
+
+/**
+ * Helper class responsible for aggregating [MetricData] into [ServiceData], [HostData] and [ServerData].
+ */
+public class ComputeMetricAggregator {
+ private val _service = ServiceAggregator()
+ private val _hosts = mutableMapOf<String, HostAggregator>()
+ private val _servers = mutableMapOf<String, ServerAggregator>()
+
+ /**
+ * Process the specified [metrics] for this cycle.
+ */
+ public fun process(metrics: Collection<MetricData>) {
+ val service = _service
+ val hosts = _hosts
+ val servers = _servers
+
+ for (metric in metrics) {
+ val resource = metric.resource
+
+ when (metric.name) {
+ // ComputeService
+ "scheduler.hosts" -> {
+ for (point in metric.longSumData.points) {
+ when (point.attributes[STATE_KEY]) {
+ "up" -> service.hostsUp = point.value.toInt()
+ "down" -> service.hostsDown = point.value.toInt()
+ }
+ }
+ }
+ "scheduler.servers" -> {
+ for (point in metric.longSumData.points) {
+ when (point.attributes[STATE_KEY]) {
+ "pending" -> service.serversPending = point.value.toInt()
+ "active" -> service.serversActive = point.value.toInt()
+ }
+ }
+ }
+ "scheduler.attempts" -> {
+ for (point in metric.longSumData.points) {
+ when (point.attributes[RESULT_KEY]) {
+ "success" -> service.attemptsSuccess = point.value.toInt()
+ "failure" -> service.attemptsFailure = point.value.toInt()
+ "error" -> service.attemptsError = point.value.toInt()
+ }
+ }
+ }
+ "scheduler.latency" -> {
+ for (point in metric.doubleHistogramData.points) {
+ val server = getServer(servers, point) ?: continue
+ server.schedulingLatency = (point.sum / point.count).roundToLong()
+ }
+ }
+
+ // SimHost
+ "system.guests" -> {
+ val agg = getHost(hosts, resource) ?: continue
+
+ for (point in metric.longSumData.points) {
+ when (point.attributes[STATE_KEY]) {
+ "terminated" -> agg.guestsTerminated = point.value.toInt()
+ "running" -> agg.guestsRunning = point.value.toInt()
+ "error" -> agg.guestsRunning = point.value.toInt()
+ "invalid" -> agg.guestsInvalid = point.value.toInt()
+ }
+ }
+ }
+ "system.cpu.limit" -> {
+ val agg = getHost(hosts, resource) ?: continue
+
+ for (point in metric.doubleGaugeData.points) {
+ val server = getServer(servers, point)
+
+ if (server != null) {
+ server.cpuLimit = point.value
+ server.host = agg.host
+ } else {
+ agg.cpuLimit = point.value
+ }
+ }
+ }
+ "system.cpu.usage" -> {
+ val agg = getHost(hosts, resource) ?: continue
+ agg.cpuUsage = metric.doubleGaugeData.points.first().value
+ }
+ "system.cpu.demand" -> {
+ val agg = getHost(hosts, resource) ?: continue
+ agg.cpuDemand = metric.doubleGaugeData.points.first().value
+ }
+ "system.cpu.utilization" -> {
+ val agg = getHost(hosts, resource) ?: continue
+ agg.cpuUtilization = metric.doubleGaugeData.points.first().value
+ }
+ "system.cpu.time" -> {
+ val agg = getHost(hosts, resource) ?: continue
+
+ for (point in metric.longSumData.points) {
+ val server = getServer(servers, point)
+ val state = point.attributes[STATE_KEY]
+ if (server != null) {
+ when (state) {
+ "active" -> server.cpuActiveTime = point.value
+ "idle" -> server.cpuIdleTime = point.value
+ "steal" -> server.cpuStealTime = point.value
+ "lost" -> server.cpuLostTime = point.value
+ }
+ server.host = agg.host
+ } else {
+ when (state) {
+ "active" -> agg.cpuActiveTime = point.value
+ "idle" -> agg.cpuIdleTime = point.value
+ "steal" -> agg.cpuStealTime = point.value
+ "lost" -> agg.cpuLostTime = point.value
+ }
+ }
+ }
+ }
+ "system.power.usage" -> {
+ val agg = getHost(hosts, resource) ?: continue
+ agg.powerUsage = metric.doubleGaugeData.points.first().value
+ }
+ "system.power.total" -> {
+ val agg = getHost(hosts, resource) ?: continue
+ agg.powerTotal = metric.doubleSumData.points.first().value
+ }
+ "system.time" -> {
+ val agg = getHost(hosts, resource) ?: continue
+
+ for (point in metric.longSumData.points) {
+ val server = getServer(servers, point)
+
+ if (server != null) {
+ when (point.attributes[STATE_KEY]) {
+ "up" -> server.uptime = point.value
+ "down" -> server.downtime = point.value
+ }
+ server.host = agg.host
+ } else {
+ when (point.attributes[STATE_KEY]) {
+ "up" -> agg.uptime = point.value
+ "down" -> agg.downtime = point.value
+ }
+ }
+ }
+ }
+ "system.time.boot" -> {
+ val agg = getHost(hosts, resource) ?: continue
+
+ for (point in metric.longGaugeData.points) {
+ val server = getServer(servers, point)
+
+ if (server != null) {
+ server.bootTime = point.value
+ server.host = agg.host
+ } else {
+ agg.bootTime = point.value
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Collect the data via the [monitor].
+ */
+ public fun collect(now: Instant, monitor: ComputeMonitor) {
+ monitor.record(_service.collect(now))
+
+ for (host in _hosts.values) {
+ monitor.record(host.collect(now))
+ }
+
+ for (server in _servers.values) {
+ monitor.record(server.collect(now))
+ }
+ }
+
+ /**
+ * Obtain the [HostAggregator] for the specified [resource].
+ */
+ private fun getHost(hosts: MutableMap<String, HostAggregator>, resource: Resource): HostAggregator? {
+ val id = resource.attributes[HOST_ID]
+ return if (id != null) {
+ hosts.computeIfAbsent(id) { HostAggregator(resource) }
+ } else {
+ null
+ }
+ }
+
+ /**
+ * Obtain the [ServerAggregator] for the specified [point].
+ */
+ private fun getServer(servers: MutableMap<String, ServerAggregator>, point: PointData): ServerAggregator? {
+ val id = point.attributes[ResourceAttributes.HOST_ID]
+ return if (id != null) {
+ servers.computeIfAbsent(id) { ServerAggregator(point.attributes) }
+ } else {
+ null
+ }
+ }
+
+ /**
+ * An aggregator for service metrics before they are reported.
+ */
+ internal class ServiceAggregator {
+ @JvmField var hostsUp = 0
+ @JvmField var hostsDown = 0
+
+ @JvmField var serversPending = 0
+ @JvmField var serversActive = 0
+
+ @JvmField var attemptsSuccess = 0
+ @JvmField var attemptsFailure = 0
+ @JvmField var attemptsError = 0
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ fun collect(now: Instant): ServiceData = toServiceData(now)
+
+ /**
+ * Convert the aggregator state to an immutable [ServiceData].
+ */
+ private fun toServiceData(now: Instant): ServiceData {
+ return ServiceData(now, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError)
+ }
+ }
+
+ /**
+ * An aggregator for host metrics before they are reported.
+ */
+ internal class HostAggregator(resource: Resource) {
+ /**
+ * The static information about this host.
+ */
+ val host = HostInfo(
+ resource.attributes[HOST_ID]!!,
+ resource.attributes[HOST_NAME] ?: "",
+ resource.attributes[HOST_ARCH] ?: "",
+ resource.attributes[HOST_NCPUS]?.toInt() ?: 0,
+ resource.attributes[HOST_MEM_CAPACITY] ?: 0,
+ )
+
+ @JvmField var guestsTerminated = 0
+ @JvmField var guestsRunning = 0
+ @JvmField var guestsError = 0
+ @JvmField var guestsInvalid = 0
+
+ @JvmField var cpuLimit = 0.0
+ @JvmField var cpuUsage = 0.0
+ @JvmField var cpuDemand = 0.0
+ @JvmField var cpuUtilization = 0.0
+
+ @JvmField var cpuActiveTime = 0L
+ @JvmField var cpuIdleTime = 0L
+ @JvmField var cpuStealTime = 0L
+ @JvmField var cpuLostTime = 0L
+ private var previousCpuActiveTime = 0L
+ private var previousCpuIdleTime = 0L
+ private var previousCpuStealTime = 0L
+ private var previousCpuLostTime = 0L
+
+ @JvmField var powerUsage = 0.0
+ @JvmField var powerTotal = 0.0
+ private var previousPowerTotal = 0.0
+
+ @JvmField var uptime = 0L
+ private var previousUptime = 0L
+ @JvmField var downtime = 0L
+ private var previousDowntime = 0L
+ @JvmField var bootTime = Long.MIN_VALUE
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ fun collect(now: Instant): HostData {
+ val data = toHostData(now)
+
+ // Reset intermediate state for next aggregation
+ previousCpuActiveTime = cpuActiveTime
+ previousCpuIdleTime = cpuIdleTime
+ previousCpuStealTime = cpuStealTime
+ previousCpuLostTime = cpuLostTime
+ previousPowerTotal = powerTotal
+ previousUptime = uptime
+ previousDowntime = downtime
+
+ guestsTerminated = 0
+ guestsRunning = 0
+ guestsError = 0
+ guestsInvalid = 0
+
+ cpuLimit = 0.0
+ cpuUsage = 0.0
+ cpuDemand = 0.0
+ cpuUtilization = 0.0
+
+ powerUsage = 0.0
+
+ return data
+ }
+
+ /**
+ * Convert the aggregator state to an immutable [HostData] instance.
+ */
+ private fun toHostData(now: Instant): HostData {
+ return HostData(
+ now,
+ host,
+ guestsTerminated,
+ guestsRunning,
+ guestsError,
+ guestsInvalid,
+ cpuLimit,
+ cpuUsage,
+ cpuDemand,
+ cpuUtilization,
+ cpuActiveTime - previousCpuActiveTime,
+ cpuIdleTime - previousCpuIdleTime,
+ cpuStealTime - previousCpuStealTime,
+ cpuLostTime - previousCpuLostTime,
+ powerUsage,
+ powerTotal - previousPowerTotal,
+ uptime - previousUptime,
+ downtime - previousDowntime,
+ if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null
+ )
+ }
+ }
+
+ /**
+ * An aggregator for server metrics before they are reported.
+ */
+ internal class ServerAggregator(attributes: Attributes) {
+ /**
+ * The static information about this server.
+ */
+ val server = ServerInfo(
+ attributes[ResourceAttributes.HOST_ID]!!,
+ attributes[ResourceAttributes.HOST_NAME]!!,
+ attributes[ResourceAttributes.HOST_TYPE]!!,
+ attributes[ResourceAttributes.HOST_ARCH]!!,
+ attributes[ResourceAttributes.HOST_IMAGE_ID]!!,
+ attributes[ResourceAttributes.HOST_IMAGE_NAME]!!,
+ attributes[AttributeKey.longKey("host.num_cpus")]!!.toInt(),
+ attributes[AttributeKey.longKey("host.mem_capacity")]!!,
+ )
+
+ /**
+ * The [HostInfo] of the host on which the server is hosted.
+ */
+ var host: HostInfo? = null
+
+ @JvmField var uptime: Long = 0
+ private var previousUptime = 0L
+ @JvmField var downtime: Long = 0
+ private var previousDowntime = 0L
+ @JvmField var bootTime: Long = 0
+ @JvmField var schedulingLatency = 0L
+ @JvmField var cpuLimit = 0.0
+ @JvmField var cpuActiveTime = 0L
+ @JvmField var cpuIdleTime = 0L
+ @JvmField var cpuStealTime = 0L
+ @JvmField var cpuLostTime = 0L
+ private var previousCpuActiveTime = 0L
+ private var previousCpuIdleTime = 0L
+ private var previousCpuStealTime = 0L
+ private var previousCpuLostTime = 0L
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ fun collect(now: Instant): ServerData {
+ val data = toServerData(now)
+
+ previousUptime = uptime
+ previousDowntime = downtime
+ previousCpuActiveTime = cpuActiveTime
+ previousCpuIdleTime = cpuIdleTime
+ previousCpuStealTime = cpuStealTime
+ previousCpuLostTime = cpuLostTime
+
+ host = null
+ cpuLimit = 0.0
+
+ return data
+ }
+
+ /**
+ * Convert the aggregator state into an immutable [ServerData].
+ */
+ private fun toServerData(now: Instant): ServerData {
+ return ServerData(
+ now,
+ server,
+ host,
+ uptime - previousUptime,
+ downtime - previousDowntime,
+ if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null,
+ schedulingLatency,
+ cpuLimit,
+ cpuActiveTime - previousCpuActiveTime,
+ cpuIdleTime - previousCpuIdleTime,
+ cpuStealTime - previousCpuStealTime,
+ cpuLostTime - previousCpuLostTime
+ )
+ }
+ }
+
+ private companion object {
+ private val STATE_KEY = AttributeKey.stringKey("state")
+ private val RESULT_KEY = AttributeKey.stringKey("result")
+ }
+}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
index 408d1325..ea96f721 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
@@ -22,28 +22,24 @@
package org.opendc.telemetry.compute
-import io.opentelemetry.api.common.AttributeKey
-import io.opentelemetry.api.common.Attributes
import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.metrics.data.*
import io.opentelemetry.sdk.metrics.export.MetricExporter
-import io.opentelemetry.sdk.resources.Resource
-import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
-import org.opendc.telemetry.compute.table.HostData
-import org.opendc.telemetry.compute.table.HostInfo
-import org.opendc.telemetry.compute.table.ServerData
-import org.opendc.telemetry.compute.table.ServerInfo
import java.time.Clock
/**
* A [MetricExporter] that redirects data to a [ComputeMonitor] implementation.
*/
public class ComputeMetricExporter(private val clock: Clock, private val monitor: ComputeMonitor) : MetricExporter {
+ /**
+ * A [ComputeMetricAggregator] that actually performs the aggregation.
+ */
+ private val agg = ComputeMetricAggregator()
+
override fun export(metrics: Collection<MetricData>): CompletableResultCode {
return try {
- reportServiceMetrics(metrics)
- reportHostMetrics(metrics)
- reportServerMetrics(metrics)
+ agg.process(metrics)
+ agg.collect(clock.instant(), monitor)
CompletableResultCode.ofSuccess()
} catch (e: Throwable) {
CompletableResultCode.ofFailure()
@@ -53,229 +49,4 @@ public class ComputeMetricExporter(private val clock: Clock, private val monitor
override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
-
- private fun reportServiceMetrics(metrics: Collection<MetricData>) {
- monitor.record(extractServiceMetrics(clock.millis(), metrics))
- }
-
- private val hosts = mutableMapOf<String, HostAggregator>()
- private val servers = mutableMapOf<String, ServerAggregator>()
-
- private fun reportHostMetrics(metrics: Collection<MetricData>) {
- val hosts = hosts
- val servers = servers
-
- for (metric in metrics) {
- val resource = metric.resource
- val hostId = resource.attributes[HOST_ID] ?: continue
- val agg = hosts.computeIfAbsent(hostId) { HostAggregator(resource) }
- agg.accept(metric)
- }
-
- val monitor = monitor
- val now = clock.millis()
- for ((_, server) in servers) {
- server.record(monitor, now)
- }
- }
-
- private fun reportServerMetrics(metrics: Collection<MetricData>) {
- val hosts = hosts
-
- for (metric in metrics) {
- val resource = metric.resource
- val host = resource.attributes[HOST_ID]?.let { hosts[it]?.host }
-
- when (metric.name) {
- "scheduler.duration" -> mapByServer(metric.doubleHistogramData.points, host) { agg, point ->
- agg.schedulingLatency = point.sum / point.count
- }
- "guest.time.running" -> mapByServer(metric.longSumData.points, host) { agg, point ->
- agg.uptime = point.value
- }
- "guest.time.error" -> mapByServer(metric.longSumData.points, host) { agg, point ->
- agg.downtime = point.value
- }
- }
- }
-
- val monitor = monitor
- val now = clock.millis()
- for ((_, host) in hosts) {
- host.record(monitor, now)
- }
- }
-
- /**
- * Helper function to map a metric by the server.
- */
- private inline fun <P : PointData> mapByServer(points: Collection<P>, host: HostInfo? = null, block: (ServerAggregator, P) -> Unit) {
- for (point in points) {
- val serverId = point.attributes[ResourceAttributes.HOST_ID] ?: continue
- val agg = servers.computeIfAbsent(serverId) { ServerAggregator(point.attributes) }
-
- if (host != null) {
- agg.host = host
- }
-
- block(agg, point)
- }
- }
-
- /**
- * An aggregator for host metrics before they are reported.
- */
- private class HostAggregator(resource: Resource) {
- /**
- * The static information about this host.
- */
- val host = HostInfo(
- resource.attributes[HOST_ID]!!,
- resource.attributes[HOST_NAME]!!,
- resource.attributes[HOST_ARCH]!!,
- resource.attributes[HOST_NCPUS]!!.toInt(),
- resource.attributes[HOST_MEM_CAPACITY]!!,
- )
-
- private var totalWork: Double = 0.0
- private var previousTotalWork = 0.0
- private var grantedWork: Double = 0.0
- private var previousGrantedWork = 0.0
- private var overcommittedWork: Double = 0.0
- private var previousOvercommittedWork = 0.0
- private var interferedWork: Double = 0.0
- private var previousInterferedWork = 0.0
- private var cpuUsage: Double = 0.0
- private var cpuDemand: Double = 0.0
- private var instanceCount: Int = 0
- private var powerDraw: Double = 0.0
- private var uptime: Long = 0
- private var previousUptime = 0L
- private var downtime: Long = 0
- private var previousDowntime = 0L
-
- fun record(monitor: ComputeMonitor, now: Long) {
- monitor.record(
- HostData(
- now,
- host,
- totalWork - previousTotalWork,
- grantedWork - previousGrantedWork,
- overcommittedWork - previousOvercommittedWork,
- interferedWork - previousInterferedWork,
- cpuUsage,
- cpuDemand,
- instanceCount,
- powerDraw,
- uptime - previousUptime,
- downtime - previousDowntime,
- )
- )
-
- previousTotalWork = totalWork
- previousGrantedWork = grantedWork
- previousOvercommittedWork = overcommittedWork
- previousInterferedWork = interferedWork
- previousUptime = uptime
- previousDowntime = downtime
- reset()
- }
-
- /**
- * Accept the [MetricData] for this host.
- */
- fun accept(data: MetricData) {
- when (data.name) {
- "cpu.work.total" -> totalWork = data.doubleSumData.points.first().value
- "cpu.work.granted" -> grantedWork = data.doubleSumData.points.first().value
- "cpu.work.overcommit" -> overcommittedWork = data.doubleSumData.points.first().value
- "cpu.work.interference" -> interferedWork = data.doubleSumData.points.first().value
- "power.usage" -> powerDraw = acceptHistogram(data)
- "cpu.usage" -> cpuUsage = acceptHistogram(data)
- "cpu.demand" -> cpuDemand = acceptHistogram(data)
- "guests.active" -> instanceCount = data.longSumData.points.first().value.toInt()
- "host.time.up" -> uptime = data.longSumData.points.first().value
- "host.time.down" -> downtime = data.longSumData.points.first().value
- }
- }
-
- private fun acceptHistogram(data: MetricData): Double {
- return when (data.type) {
- MetricDataType.HISTOGRAM -> {
- val point = data.doubleHistogramData.points.first()
- point.sum / point.count
- }
- MetricDataType.SUMMARY -> {
- val point = data.doubleSummaryData.points.first()
- point.sum / point.count
- }
- else -> error("Invalid metric type")
- }
- }
-
- private fun reset() {
- totalWork = 0.0
- grantedWork = 0.0
- overcommittedWork = 0.0
- interferedWork = 0.0
- cpuUsage = 0.0
- cpuDemand = 0.0
- instanceCount = 0
- powerDraw = 0.0
- uptime = 0L
- downtime = 0L
- }
- }
-
- /**
- * An aggregator for server metrics before they are reported.
- */
- private class ServerAggregator(attributes: Attributes) {
- /**
- * The static information about this server.
- */
- val server = ServerInfo(
- attributes[ResourceAttributes.HOST_ID]!!,
- attributes[ResourceAttributes.HOST_NAME]!!,
- attributes[ResourceAttributes.HOST_TYPE]!!,
- attributes[ResourceAttributes.HOST_ARCH]!!,
- attributes[ResourceAttributes.HOST_IMAGE_ID]!!,
- attributes[ResourceAttributes.HOST_IMAGE_NAME]!!,
- attributes[AttributeKey.longKey("host.num_cpus")]!!.toInt(),
- attributes[AttributeKey.longKey("host.mem_capacity")]!!,
- )
-
- /**
- * The [HostInfo] of the host on which the server is hosted.
- */
- var host: HostInfo? = null
-
- @JvmField var uptime: Long = 0
- private var previousUptime = 0L
- @JvmField var downtime: Long = 0
- private var previousDowntime = 0L
- @JvmField var schedulingLatency = 0.0
-
- fun record(monitor: ComputeMonitor, now: Long) {
- monitor.record(
- ServerData(
- now,
- server,
- null,
- uptime - previousUptime,
- downtime - previousDowntime,
- )
- )
-
- previousUptime = uptime
- previousDowntime = downtime
- reset()
- }
-
- private fun reset() {
- host = null
- uptime = 0L
- downtime = 0L
- }
- }
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
index f3690ee8..25d346fb 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
@@ -22,64 +22,31 @@
package org.opendc.telemetry.compute
-import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.sdk.metrics.data.MetricData
import io.opentelemetry.sdk.metrics.export.MetricProducer
import org.opendc.telemetry.compute.table.ServiceData
+import java.time.Instant
/**
* Collect the metrics of the compute service.
*/
-public fun collectServiceMetrics(timestamp: Long, metricProducer: MetricProducer): ServiceData {
+public fun collectServiceMetrics(timestamp: Instant, metricProducer: MetricProducer): ServiceData {
return extractServiceMetrics(timestamp, metricProducer.collectAllMetrics())
}
/**
* Extract a [ServiceData] object from the specified list of metric data.
*/
-public fun extractServiceMetrics(timestamp: Long, metrics: Collection<MetricData>): ServiceData {
- val resultKey = AttributeKey.stringKey("result")
- val stateKey = AttributeKey.stringKey("state")
-
- var hostsUp = 0
- var hostsDown = 0
-
- var serversPending = 0
- var serversActive = 0
-
- var attemptsSuccess = 0
- var attemptsFailure = 0
- var attemptsError = 0
-
- for (metric in metrics) {
- when (metric.name) {
- "scheduler.hosts" -> {
- for (point in metric.longSumData.points) {
- when (point.attributes[stateKey]) {
- "up" -> hostsUp = point.value.toInt()
- "down" -> hostsDown = point.value.toInt()
- }
- }
- }
- "scheduler.servers" -> {
- for (point in metric.longSumData.points) {
- when (point.attributes[stateKey]) {
- "pending" -> serversPending = point.value.toInt()
- "active" -> serversActive = point.value.toInt()
- }
- }
- }
- "scheduler.attempts" -> {
- for (point in metric.longSumData.points) {
- when (point.attributes[resultKey]) {
- "success" -> attemptsSuccess = point.value.toInt()
- "failure" -> attemptsFailure = point.value.toInt()
- "error" -> attemptsError = point.value.toInt()
- }
- }
- }
+public fun extractServiceMetrics(timestamp: Instant, metrics: Collection<MetricData>): ServiceData {
+ lateinit var serviceData: ServiceData
+ val agg = ComputeMetricAggregator()
+ val monitor = object : ComputeMonitor {
+ override fun record(data: ServiceData) {
+ serviceData = data
}
}
- return ServiceData(timestamp, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError)
+ agg.process(metrics)
+ agg.collect(timestamp, monitor)
+ return serviceData
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt
index e3ecda3d..8e787b97 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt
@@ -22,20 +22,29 @@
package org.opendc.telemetry.compute.table
+import java.time.Instant
+
/**
* A trace entry for a particular host.
*/
public data class HostData(
- public val timestamp: Long,
- public val host: HostInfo,
- public val totalWork: Double,
- public val grantedWork: Double,
- public val overcommittedWork: Double,
- public val interferedWork: Double,
- public val cpuUsage: Double,
- public val cpuDemand: Double,
- public val instanceCount: Int,
- public val powerDraw: Double,
- public val uptime: Long,
- public val downtime: Long,
+ val timestamp: Instant,
+ val host: HostInfo,
+ val guestsTerminated: Int,
+ val guestsRunning: Int,
+ val guestsError: Int,
+ val guestsInvalid: Int,
+ val cpuLimit: Double,
+ val cpuUsage: Double,
+ val cpuDemand: Double,
+ val cpuUtilization: Double,
+ val cpuActiveTime: Long,
+ val cpuIdleTime: Long,
+ val cpuStealTime: Long,
+ val cpuLostTime: Long,
+ val powerUsage: Double,
+ val powerTotal: Double,
+ val uptime: Long,
+ val downtime: Long,
+ val bootTime: Instant?
)
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt
index 7fde86d9..c48bff3a 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt
@@ -22,13 +22,22 @@
package org.opendc.telemetry.compute.table
+import java.time.Instant
+
/**
* A trace entry for a particular server.
*/
public data class ServerData(
- public val timestamp: Long,
- public val server: ServerInfo,
- public val host: HostInfo?,
- public val uptime: Long,
- public val downtime: Long,
+ val timestamp: Instant,
+ val server: ServerInfo,
+ val host: HostInfo?,
+ val uptime: Long,
+ val downtime: Long,
+ val bootTime: Instant?,
+ val schedulingLatency: Long,
+ val cpuLimit: Double,
+ val cpuActiveTime: Long,
+ val cpuIdleTime: Long,
+ val cpuStealTime: Long,
+ val cpuLostTime: Long,
)
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
index da2ebdf4..6db1399d 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
@@ -22,16 +22,18 @@
package org.opendc.telemetry.compute.table
+import java.time.Instant
+
/**
* A trace entry for the compute service.
*/
public data class ServiceData(
- public val timestamp: Long,
- public val hostsUp: Int,
- public val hostsDown: Int,
- public val serversPending: Int,
- public val serversActive: Int,
- public val attemptsSuccess: Int,
- public val attemptsFailure: Int,
- public val attemptsError: Int
+ val timestamp: Instant,
+ val hostsUp: Int,
+ val hostsDown: Int,
+ val serversPending: Int,
+ val serversActive: Int,
+ val attemptsSuccess: Int,
+ val attemptsFailure: Int,
+ val attemptsError: Int
)