summaryrefslogtreecommitdiff
path: root/opendc-telemetry
diff options
context:
space:
mode:
Diffstat (limited to 'opendc-telemetry')
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt448
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt243
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt55
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt33
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt19
-rw-r--r--opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt18
-rw-r--r--opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt2
7 files changed, 512 insertions, 306 deletions
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
new file mode 100644
index 00000000..e9449634
--- /dev/null
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt
@@ -0,0 +1,448 @@
+/*
+ * Copyright (c) 2021 AtLarge Research
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.opendc.telemetry.compute
+
+import io.opentelemetry.api.common.AttributeKey
+import io.opentelemetry.api.common.Attributes
+import io.opentelemetry.sdk.metrics.data.MetricData
+import io.opentelemetry.sdk.metrics.data.PointData
+import io.opentelemetry.sdk.resources.Resource
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
+import org.opendc.telemetry.compute.table.*
+import java.time.Instant
+import kotlin.math.roundToLong
+
+/**
+ * Helper class responsible for aggregating [MetricData] into [ServiceData], [HostData] and [ServerData].
+ */
+public class ComputeMetricAggregator {
+ private val _service = ServiceAggregator()
+ private val _hosts = mutableMapOf<String, HostAggregator>()
+ private val _servers = mutableMapOf<String, ServerAggregator>()
+
+ /**
+ * Process the specified [metrics] for this cycle.
+ */
+ public fun process(metrics: Collection<MetricData>) {
+ val service = _service
+ val hosts = _hosts
+ val servers = _servers
+
+ for (metric in metrics) {
+ val resource = metric.resource
+
+ when (metric.name) {
+ // ComputeService
+ "scheduler.hosts" -> {
+ for (point in metric.longSumData.points) {
+ when (point.attributes[STATE_KEY]) {
+ "up" -> service.hostsUp = point.value.toInt()
+ "down" -> service.hostsDown = point.value.toInt()
+ }
+ }
+ }
+ "scheduler.servers" -> {
+ for (point in metric.longSumData.points) {
+ when (point.attributes[STATE_KEY]) {
+ "pending" -> service.serversPending = point.value.toInt()
+ "active" -> service.serversActive = point.value.toInt()
+ }
+ }
+ }
+ "scheduler.attempts" -> {
+ for (point in metric.longSumData.points) {
+ when (point.attributes[RESULT_KEY]) {
+ "success" -> service.attemptsSuccess = point.value.toInt()
+ "failure" -> service.attemptsFailure = point.value.toInt()
+ "error" -> service.attemptsError = point.value.toInt()
+ }
+ }
+ }
+ "scheduler.latency" -> {
+ for (point in metric.doubleHistogramData.points) {
+ val server = getServer(servers, point) ?: continue
+ server.schedulingLatency = (point.sum / point.count).roundToLong()
+ }
+ }
+
+ // SimHost
+ "system.guests" -> {
+ val agg = getHost(hosts, resource) ?: continue
+
+ for (point in metric.longSumData.points) {
+ when (point.attributes[STATE_KEY]) {
+ "terminated" -> agg.guestsTerminated = point.value.toInt()
+ "running" -> agg.guestsRunning = point.value.toInt()
+ "error" -> agg.guestsRunning = point.value.toInt()
+ "invalid" -> agg.guestsInvalid = point.value.toInt()
+ }
+ }
+ }
+ "system.cpu.limit" -> {
+ val agg = getHost(hosts, resource) ?: continue
+
+ for (point in metric.doubleGaugeData.points) {
+ val server = getServer(servers, point)
+
+ if (server != null) {
+ server.cpuLimit = point.value
+ server.host = agg.host
+ } else {
+ agg.cpuLimit = point.value
+ }
+ }
+ }
+ "system.cpu.usage" -> {
+ val agg = getHost(hosts, resource) ?: continue
+ agg.cpuUsage = metric.doubleGaugeData.points.first().value
+ }
+ "system.cpu.demand" -> {
+ val agg = getHost(hosts, resource) ?: continue
+ agg.cpuDemand = metric.doubleGaugeData.points.first().value
+ }
+ "system.cpu.utilization" -> {
+ val agg = getHost(hosts, resource) ?: continue
+ agg.cpuUtilization = metric.doubleGaugeData.points.first().value
+ }
+ "system.cpu.time" -> {
+ val agg = getHost(hosts, resource) ?: continue
+
+ for (point in metric.longSumData.points) {
+ val server = getServer(servers, point)
+ val state = point.attributes[STATE_KEY]
+ if (server != null) {
+ when (state) {
+ "active" -> server.cpuActiveTime = point.value
+ "idle" -> server.cpuIdleTime = point.value
+ "steal" -> server.cpuStealTime = point.value
+ "lost" -> server.cpuLostTime = point.value
+ }
+ server.host = agg.host
+ } else {
+ when (state) {
+ "active" -> agg.cpuActiveTime = point.value
+ "idle" -> agg.cpuIdleTime = point.value
+ "steal" -> agg.cpuStealTime = point.value
+ "lost" -> agg.cpuLostTime = point.value
+ }
+ }
+ }
+ }
+ "system.power.usage" -> {
+ val agg = getHost(hosts, resource) ?: continue
+ agg.powerUsage = metric.doubleGaugeData.points.first().value
+ }
+ "system.power.total" -> {
+ val agg = getHost(hosts, resource) ?: continue
+ agg.powerTotal = metric.doubleSumData.points.first().value
+ }
+ "system.time" -> {
+ val agg = getHost(hosts, resource) ?: continue
+
+ for (point in metric.longSumData.points) {
+ val server = getServer(servers, point)
+
+ if (server != null) {
+ when (point.attributes[STATE_KEY]) {
+ "up" -> server.uptime = point.value
+ "down" -> server.downtime = point.value
+ }
+ server.host = agg.host
+ } else {
+ when (point.attributes[STATE_KEY]) {
+ "up" -> agg.uptime = point.value
+ "down" -> agg.downtime = point.value
+ }
+ }
+ }
+ }
+ "system.time.boot" -> {
+ val agg = getHost(hosts, resource) ?: continue
+
+ for (point in metric.longGaugeData.points) {
+ val server = getServer(servers, point)
+
+ if (server != null) {
+ server.bootTime = point.value
+ server.host = agg.host
+ } else {
+ agg.bootTime = point.value
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Collect the data via the [monitor].
+ */
+ public fun collect(now: Instant, monitor: ComputeMonitor) {
+ monitor.record(_service.collect(now))
+
+ for (host in _hosts.values) {
+ monitor.record(host.collect(now))
+ }
+
+ for (server in _servers.values) {
+ monitor.record(server.collect(now))
+ }
+ }
+
+ /**
+ * Obtain the [HostAggregator] for the specified [resource].
+ */
+ private fun getHost(hosts: MutableMap<String, HostAggregator>, resource: Resource): HostAggregator? {
+ val id = resource.attributes[HOST_ID]
+ return if (id != null) {
+ hosts.computeIfAbsent(id) { HostAggregator(resource) }
+ } else {
+ null
+ }
+ }
+
+ /**
+ * Obtain the [ServerAggregator] for the specified [point].
+ */
+ private fun getServer(servers: MutableMap<String, ServerAggregator>, point: PointData): ServerAggregator? {
+ val id = point.attributes[ResourceAttributes.HOST_ID]
+ return if (id != null) {
+ servers.computeIfAbsent(id) { ServerAggregator(point.attributes) }
+ } else {
+ null
+ }
+ }
+
+ /**
+ * An aggregator for service metrics before they are reported.
+ */
+ internal class ServiceAggregator {
+ @JvmField var hostsUp = 0
+ @JvmField var hostsDown = 0
+
+ @JvmField var serversPending = 0
+ @JvmField var serversActive = 0
+
+ @JvmField var attemptsSuccess = 0
+ @JvmField var attemptsFailure = 0
+ @JvmField var attemptsError = 0
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ fun collect(now: Instant): ServiceData = toServiceData(now)
+
+ /**
+ * Convert the aggregator state to an immutable [ServiceData].
+ */
+ private fun toServiceData(now: Instant): ServiceData {
+ return ServiceData(now, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError)
+ }
+ }
+
+ /**
+ * An aggregator for host metrics before they are reported.
+ */
+ internal class HostAggregator(resource: Resource) {
+ /**
+ * The static information about this host.
+ */
+ val host = HostInfo(
+ resource.attributes[HOST_ID]!!,
+ resource.attributes[HOST_NAME] ?: "",
+ resource.attributes[HOST_ARCH] ?: "",
+ resource.attributes[HOST_NCPUS]?.toInt() ?: 0,
+ resource.attributes[HOST_MEM_CAPACITY] ?: 0,
+ )
+
+ @JvmField var guestsTerminated = 0
+ @JvmField var guestsRunning = 0
+ @JvmField var guestsError = 0
+ @JvmField var guestsInvalid = 0
+
+ @JvmField var cpuLimit = 0.0
+ @JvmField var cpuUsage = 0.0
+ @JvmField var cpuDemand = 0.0
+ @JvmField var cpuUtilization = 0.0
+
+ @JvmField var cpuActiveTime = 0L
+ @JvmField var cpuIdleTime = 0L
+ @JvmField var cpuStealTime = 0L
+ @JvmField var cpuLostTime = 0L
+ private var previousCpuActiveTime = 0L
+ private var previousCpuIdleTime = 0L
+ private var previousCpuStealTime = 0L
+ private var previousCpuLostTime = 0L
+
+ @JvmField var powerUsage = 0.0
+ @JvmField var powerTotal = 0.0
+ private var previousPowerTotal = 0.0
+
+ @JvmField var uptime = 0L
+ private var previousUptime = 0L
+ @JvmField var downtime = 0L
+ private var previousDowntime = 0L
+ @JvmField var bootTime = Long.MIN_VALUE
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ fun collect(now: Instant): HostData {
+ val data = toHostData(now)
+
+ // Reset intermediate state for next aggregation
+ previousCpuActiveTime = cpuActiveTime
+ previousCpuIdleTime = cpuIdleTime
+ previousCpuStealTime = cpuStealTime
+ previousCpuLostTime = cpuLostTime
+ previousPowerTotal = powerTotal
+ previousUptime = uptime
+ previousDowntime = downtime
+
+ guestsTerminated = 0
+ guestsRunning = 0
+ guestsError = 0
+ guestsInvalid = 0
+
+ cpuLimit = 0.0
+ cpuUsage = 0.0
+ cpuDemand = 0.0
+ cpuUtilization = 0.0
+
+ powerUsage = 0.0
+
+ return data
+ }
+
+ /**
+ * Convert the aggregator state to an immutable [HostData] instance.
+ */
+ private fun toHostData(now: Instant): HostData {
+ return HostData(
+ now,
+ host,
+ guestsTerminated,
+ guestsRunning,
+ guestsError,
+ guestsInvalid,
+ cpuLimit,
+ cpuUsage,
+ cpuDemand,
+ cpuUtilization,
+ cpuActiveTime - previousCpuActiveTime,
+ cpuIdleTime - previousCpuIdleTime,
+ cpuStealTime - previousCpuStealTime,
+ cpuLostTime - previousCpuLostTime,
+ powerUsage,
+ powerTotal - previousPowerTotal,
+ uptime - previousUptime,
+ downtime - previousDowntime,
+ if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null
+ )
+ }
+ }
+
+ /**
+ * An aggregator for server metrics before they are reported.
+ */
+ internal class ServerAggregator(attributes: Attributes) {
+ /**
+ * The static information about this server.
+ */
+ val server = ServerInfo(
+ attributes[ResourceAttributes.HOST_ID]!!,
+ attributes[ResourceAttributes.HOST_NAME]!!,
+ attributes[ResourceAttributes.HOST_TYPE]!!,
+ attributes[ResourceAttributes.HOST_ARCH]!!,
+ attributes[ResourceAttributes.HOST_IMAGE_ID]!!,
+ attributes[ResourceAttributes.HOST_IMAGE_NAME]!!,
+ attributes[AttributeKey.longKey("host.num_cpus")]!!.toInt(),
+ attributes[AttributeKey.longKey("host.mem_capacity")]!!,
+ )
+
+ /**
+ * The [HostInfo] of the host on which the server is hosted.
+ */
+ var host: HostInfo? = null
+
+ @JvmField var uptime: Long = 0
+ private var previousUptime = 0L
+ @JvmField var downtime: Long = 0
+ private var previousDowntime = 0L
+ @JvmField var bootTime: Long = 0
+ @JvmField var schedulingLatency = 0L
+ @JvmField var cpuLimit = 0.0
+ @JvmField var cpuActiveTime = 0L
+ @JvmField var cpuIdleTime = 0L
+ @JvmField var cpuStealTime = 0L
+ @JvmField var cpuLostTime = 0L
+ private var previousCpuActiveTime = 0L
+ private var previousCpuIdleTime = 0L
+ private var previousCpuStealTime = 0L
+ private var previousCpuLostTime = 0L
+
+ /**
+ * Finish the aggregation for this cycle.
+ */
+ fun collect(now: Instant): ServerData {
+ val data = toServerData(now)
+
+ previousUptime = uptime
+ previousDowntime = downtime
+ previousCpuActiveTime = cpuActiveTime
+ previousCpuIdleTime = cpuIdleTime
+ previousCpuStealTime = cpuStealTime
+ previousCpuLostTime = cpuLostTime
+
+ host = null
+ cpuLimit = 0.0
+
+ return data
+ }
+
+ /**
+ * Convert the aggregator state into an immutable [ServerData].
+ */
+ private fun toServerData(now: Instant): ServerData {
+ return ServerData(
+ now,
+ server,
+ host,
+ uptime - previousUptime,
+ downtime - previousDowntime,
+ if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null,
+ schedulingLatency,
+ cpuLimit,
+ cpuActiveTime - previousCpuActiveTime,
+ cpuIdleTime - previousCpuIdleTime,
+ cpuStealTime - previousCpuStealTime,
+ cpuLostTime - previousCpuLostTime
+ )
+ }
+ }
+
+ private companion object {
+ private val STATE_KEY = AttributeKey.stringKey("state")
+ private val RESULT_KEY = AttributeKey.stringKey("result")
+ }
+}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
index 408d1325..ea96f721 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt
@@ -22,28 +22,24 @@
package org.opendc.telemetry.compute
-import io.opentelemetry.api.common.AttributeKey
-import io.opentelemetry.api.common.Attributes
import io.opentelemetry.sdk.common.CompletableResultCode
import io.opentelemetry.sdk.metrics.data.*
import io.opentelemetry.sdk.metrics.export.MetricExporter
-import io.opentelemetry.sdk.resources.Resource
-import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
-import org.opendc.telemetry.compute.table.HostData
-import org.opendc.telemetry.compute.table.HostInfo
-import org.opendc.telemetry.compute.table.ServerData
-import org.opendc.telemetry.compute.table.ServerInfo
import java.time.Clock
/**
* A [MetricExporter] that redirects data to a [ComputeMonitor] implementation.
*/
public class ComputeMetricExporter(private val clock: Clock, private val monitor: ComputeMonitor) : MetricExporter {
+ /**
+ * A [ComputeMetricAggregator] that actually performs the aggregation.
+ */
+ private val agg = ComputeMetricAggregator()
+
override fun export(metrics: Collection<MetricData>): CompletableResultCode {
return try {
- reportServiceMetrics(metrics)
- reportHostMetrics(metrics)
- reportServerMetrics(metrics)
+ agg.process(metrics)
+ agg.collect(clock.instant(), monitor)
CompletableResultCode.ofSuccess()
} catch (e: Throwable) {
CompletableResultCode.ofFailure()
@@ -53,229 +49,4 @@ public class ComputeMetricExporter(private val clock: Clock, private val monitor
override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess()
override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess()
-
- private fun reportServiceMetrics(metrics: Collection<MetricData>) {
- monitor.record(extractServiceMetrics(clock.millis(), metrics))
- }
-
- private val hosts = mutableMapOf<String, HostAggregator>()
- private val servers = mutableMapOf<String, ServerAggregator>()
-
- private fun reportHostMetrics(metrics: Collection<MetricData>) {
- val hosts = hosts
- val servers = servers
-
- for (metric in metrics) {
- val resource = metric.resource
- val hostId = resource.attributes[HOST_ID] ?: continue
- val agg = hosts.computeIfAbsent(hostId) { HostAggregator(resource) }
- agg.accept(metric)
- }
-
- val monitor = monitor
- val now = clock.millis()
- for ((_, server) in servers) {
- server.record(monitor, now)
- }
- }
-
- private fun reportServerMetrics(metrics: Collection<MetricData>) {
- val hosts = hosts
-
- for (metric in metrics) {
- val resource = metric.resource
- val host = resource.attributes[HOST_ID]?.let { hosts[it]?.host }
-
- when (metric.name) {
- "scheduler.duration" -> mapByServer(metric.doubleHistogramData.points, host) { agg, point ->
- agg.schedulingLatency = point.sum / point.count
- }
- "guest.time.running" -> mapByServer(metric.longSumData.points, host) { agg, point ->
- agg.uptime = point.value
- }
- "guest.time.error" -> mapByServer(metric.longSumData.points, host) { agg, point ->
- agg.downtime = point.value
- }
- }
- }
-
- val monitor = monitor
- val now = clock.millis()
- for ((_, host) in hosts) {
- host.record(monitor, now)
- }
- }
-
- /**
- * Helper function to map a metric by the server.
- */
- private inline fun <P : PointData> mapByServer(points: Collection<P>, host: HostInfo? = null, block: (ServerAggregator, P) -> Unit) {
- for (point in points) {
- val serverId = point.attributes[ResourceAttributes.HOST_ID] ?: continue
- val agg = servers.computeIfAbsent(serverId) { ServerAggregator(point.attributes) }
-
- if (host != null) {
- agg.host = host
- }
-
- block(agg, point)
- }
- }
-
- /**
- * An aggregator for host metrics before they are reported.
- */
- private class HostAggregator(resource: Resource) {
- /**
- * The static information about this host.
- */
- val host = HostInfo(
- resource.attributes[HOST_ID]!!,
- resource.attributes[HOST_NAME]!!,
- resource.attributes[HOST_ARCH]!!,
- resource.attributes[HOST_NCPUS]!!.toInt(),
- resource.attributes[HOST_MEM_CAPACITY]!!,
- )
-
- private var totalWork: Double = 0.0
- private var previousTotalWork = 0.0
- private var grantedWork: Double = 0.0
- private var previousGrantedWork = 0.0
- private var overcommittedWork: Double = 0.0
- private var previousOvercommittedWork = 0.0
- private var interferedWork: Double = 0.0
- private var previousInterferedWork = 0.0
- private var cpuUsage: Double = 0.0
- private var cpuDemand: Double = 0.0
- private var instanceCount: Int = 0
- private var powerDraw: Double = 0.0
- private var uptime: Long = 0
- private var previousUptime = 0L
- private var downtime: Long = 0
- private var previousDowntime = 0L
-
- fun record(monitor: ComputeMonitor, now: Long) {
- monitor.record(
- HostData(
- now,
- host,
- totalWork - previousTotalWork,
- grantedWork - previousGrantedWork,
- overcommittedWork - previousOvercommittedWork,
- interferedWork - previousInterferedWork,
- cpuUsage,
- cpuDemand,
- instanceCount,
- powerDraw,
- uptime - previousUptime,
- downtime - previousDowntime,
- )
- )
-
- previousTotalWork = totalWork
- previousGrantedWork = grantedWork
- previousOvercommittedWork = overcommittedWork
- previousInterferedWork = interferedWork
- previousUptime = uptime
- previousDowntime = downtime
- reset()
- }
-
- /**
- * Accept the [MetricData] for this host.
- */
- fun accept(data: MetricData) {
- when (data.name) {
- "cpu.work.total" -> totalWork = data.doubleSumData.points.first().value
- "cpu.work.granted" -> grantedWork = data.doubleSumData.points.first().value
- "cpu.work.overcommit" -> overcommittedWork = data.doubleSumData.points.first().value
- "cpu.work.interference" -> interferedWork = data.doubleSumData.points.first().value
- "power.usage" -> powerDraw = acceptHistogram(data)
- "cpu.usage" -> cpuUsage = acceptHistogram(data)
- "cpu.demand" -> cpuDemand = acceptHistogram(data)
- "guests.active" -> instanceCount = data.longSumData.points.first().value.toInt()
- "host.time.up" -> uptime = data.longSumData.points.first().value
- "host.time.down" -> downtime = data.longSumData.points.first().value
- }
- }
-
- private fun acceptHistogram(data: MetricData): Double {
- return when (data.type) {
- MetricDataType.HISTOGRAM -> {
- val point = data.doubleHistogramData.points.first()
- point.sum / point.count
- }
- MetricDataType.SUMMARY -> {
- val point = data.doubleSummaryData.points.first()
- point.sum / point.count
- }
- else -> error("Invalid metric type")
- }
- }
-
- private fun reset() {
- totalWork = 0.0
- grantedWork = 0.0
- overcommittedWork = 0.0
- interferedWork = 0.0
- cpuUsage = 0.0
- cpuDemand = 0.0
- instanceCount = 0
- powerDraw = 0.0
- uptime = 0L
- downtime = 0L
- }
- }
-
- /**
- * An aggregator for server metrics before they are reported.
- */
- private class ServerAggregator(attributes: Attributes) {
- /**
- * The static information about this server.
- */
- val server = ServerInfo(
- attributes[ResourceAttributes.HOST_ID]!!,
- attributes[ResourceAttributes.HOST_NAME]!!,
- attributes[ResourceAttributes.HOST_TYPE]!!,
- attributes[ResourceAttributes.HOST_ARCH]!!,
- attributes[ResourceAttributes.HOST_IMAGE_ID]!!,
- attributes[ResourceAttributes.HOST_IMAGE_NAME]!!,
- attributes[AttributeKey.longKey("host.num_cpus")]!!.toInt(),
- attributes[AttributeKey.longKey("host.mem_capacity")]!!,
- )
-
- /**
- * The [HostInfo] of the host on which the server is hosted.
- */
- var host: HostInfo? = null
-
- @JvmField var uptime: Long = 0
- private var previousUptime = 0L
- @JvmField var downtime: Long = 0
- private var previousDowntime = 0L
- @JvmField var schedulingLatency = 0.0
-
- fun record(monitor: ComputeMonitor, now: Long) {
- monitor.record(
- ServerData(
- now,
- server,
- null,
- uptime - previousUptime,
- downtime - previousDowntime,
- )
- )
-
- previousUptime = uptime
- previousDowntime = downtime
- reset()
- }
-
- private fun reset() {
- host = null
- uptime = 0L
- downtime = 0L
- }
- }
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
index f3690ee8..25d346fb 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt
@@ -22,64 +22,31 @@
package org.opendc.telemetry.compute
-import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.sdk.metrics.data.MetricData
import io.opentelemetry.sdk.metrics.export.MetricProducer
import org.opendc.telemetry.compute.table.ServiceData
+import java.time.Instant
/**
* Collect the metrics of the compute service.
*/
-public fun collectServiceMetrics(timestamp: Long, metricProducer: MetricProducer): ServiceData {
+public fun collectServiceMetrics(timestamp: Instant, metricProducer: MetricProducer): ServiceData {
return extractServiceMetrics(timestamp, metricProducer.collectAllMetrics())
}
/**
* Extract a [ServiceData] object from the specified list of metric data.
*/
-public fun extractServiceMetrics(timestamp: Long, metrics: Collection<MetricData>): ServiceData {
- val resultKey = AttributeKey.stringKey("result")
- val stateKey = AttributeKey.stringKey("state")
-
- var hostsUp = 0
- var hostsDown = 0
-
- var serversPending = 0
- var serversActive = 0
-
- var attemptsSuccess = 0
- var attemptsFailure = 0
- var attemptsError = 0
-
- for (metric in metrics) {
- when (metric.name) {
- "scheduler.hosts" -> {
- for (point in metric.longSumData.points) {
- when (point.attributes[stateKey]) {
- "up" -> hostsUp = point.value.toInt()
- "down" -> hostsDown = point.value.toInt()
- }
- }
- }
- "scheduler.servers" -> {
- for (point in metric.longSumData.points) {
- when (point.attributes[stateKey]) {
- "pending" -> serversPending = point.value.toInt()
- "active" -> serversActive = point.value.toInt()
- }
- }
- }
- "scheduler.attempts" -> {
- for (point in metric.longSumData.points) {
- when (point.attributes[resultKey]) {
- "success" -> attemptsSuccess = point.value.toInt()
- "failure" -> attemptsFailure = point.value.toInt()
- "error" -> attemptsError = point.value.toInt()
- }
- }
- }
+public fun extractServiceMetrics(timestamp: Instant, metrics: Collection<MetricData>): ServiceData {
+ lateinit var serviceData: ServiceData
+ val agg = ComputeMetricAggregator()
+ val monitor = object : ComputeMonitor {
+ override fun record(data: ServiceData) {
+ serviceData = data
}
}
- return ServiceData(timestamp, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError)
+ agg.process(metrics)
+ agg.collect(timestamp, monitor)
+ return serviceData
}
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt
index e3ecda3d..8e787b97 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt
@@ -22,20 +22,29 @@
package org.opendc.telemetry.compute.table
+import java.time.Instant
+
/**
* A trace entry for a particular host.
*/
public data class HostData(
- public val timestamp: Long,
- public val host: HostInfo,
- public val totalWork: Double,
- public val grantedWork: Double,
- public val overcommittedWork: Double,
- public val interferedWork: Double,
- public val cpuUsage: Double,
- public val cpuDemand: Double,
- public val instanceCount: Int,
- public val powerDraw: Double,
- public val uptime: Long,
- public val downtime: Long,
+ val timestamp: Instant,
+ val host: HostInfo,
+ val guestsTerminated: Int,
+ val guestsRunning: Int,
+ val guestsError: Int,
+ val guestsInvalid: Int,
+ val cpuLimit: Double,
+ val cpuUsage: Double,
+ val cpuDemand: Double,
+ val cpuUtilization: Double,
+ val cpuActiveTime: Long,
+ val cpuIdleTime: Long,
+ val cpuStealTime: Long,
+ val cpuLostTime: Long,
+ val powerUsage: Double,
+ val powerTotal: Double,
+ val uptime: Long,
+ val downtime: Long,
+ val bootTime: Instant?
)
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt
index 7fde86d9..c48bff3a 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt
@@ -22,13 +22,22 @@
package org.opendc.telemetry.compute.table
+import java.time.Instant
+
/**
* A trace entry for a particular server.
*/
public data class ServerData(
- public val timestamp: Long,
- public val server: ServerInfo,
- public val host: HostInfo?,
- public val uptime: Long,
- public val downtime: Long,
+ val timestamp: Instant,
+ val server: ServerInfo,
+ val host: HostInfo?,
+ val uptime: Long,
+ val downtime: Long,
+ val bootTime: Instant?,
+ val schedulingLatency: Long,
+ val cpuLimit: Double,
+ val cpuActiveTime: Long,
+ val cpuIdleTime: Long,
+ val cpuStealTime: Long,
+ val cpuLostTime: Long,
)
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
index da2ebdf4..6db1399d 100644
--- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
+++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt
@@ -22,16 +22,18 @@
package org.opendc.telemetry.compute.table
+import java.time.Instant
+
/**
* A trace entry for the compute service.
*/
public data class ServiceData(
- public val timestamp: Long,
- public val hostsUp: Int,
- public val hostsDown: Int,
- public val serversPending: Int,
- public val serversActive: Int,
- public val attemptsSuccess: Int,
- public val attemptsFailure: Int,
- public val attemptsError: Int
+ val timestamp: Instant,
+ val hostsUp: Int,
+ val hostsDown: Int,
+ val serversPending: Int,
+ val serversActive: Int,
+ val attemptsSuccess: Int,
+ val attemptsFailure: Int,
+ val attemptsError: Int
)
diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
index 8f19ab81..07f0ff7f 100644
--- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
+++ b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt
@@ -44,7 +44,7 @@ public class CoroutineMetricReader(
scope: CoroutineScope,
private val producers: List<MetricProducer>,
private val exporter: MetricExporter,
- private val exportInterval: Duration = Duration.ofMinutes(1)
+ private val exportInterval: Duration = Duration.ofMinutes(5)
) : AutoCloseable {
private val logger = KotlinLogging.logger {}