diff options
Diffstat (limited to 'opendc-telemetry')
7 files changed, 512 insertions, 306 deletions
diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt new file mode 100644 index 00000000..e9449634 --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt @@ -0,0 +1,448 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute + +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.data.PointData +import io.opentelemetry.sdk.resources.Resource +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import org.opendc.telemetry.compute.table.* +import java.time.Instant +import kotlin.math.roundToLong + +/** + * Helper class responsible for aggregating [MetricData] into [ServiceData], [HostData] and [ServerData]. + */ +public class ComputeMetricAggregator { + private val _service = ServiceAggregator() + private val _hosts = mutableMapOf<String, HostAggregator>() + private val _servers = mutableMapOf<String, ServerAggregator>() + + /** + * Process the specified [metrics] for this cycle. + */ + public fun process(metrics: Collection<MetricData>) { + val service = _service + val hosts = _hosts + val servers = _servers + + for (metric in metrics) { + val resource = metric.resource + + when (metric.name) { + // ComputeService + "scheduler.hosts" -> { + for (point in metric.longSumData.points) { + when (point.attributes[STATE_KEY]) { + "up" -> service.hostsUp = point.value.toInt() + "down" -> service.hostsDown = point.value.toInt() + } + } + } + "scheduler.servers" -> { + for (point in metric.longSumData.points) { + when (point.attributes[STATE_KEY]) { + "pending" -> service.serversPending = point.value.toInt() + "active" -> service.serversActive = point.value.toInt() + } + } + } + "scheduler.attempts" -> { + for (point in metric.longSumData.points) { + when (point.attributes[RESULT_KEY]) { + "success" -> service.attemptsSuccess = point.value.toInt() + "failure" -> service.attemptsFailure = point.value.toInt() + "error" -> service.attemptsError = point.value.toInt() + } + } + } + "scheduler.latency" -> { + for (point in metric.doubleHistogramData.points) { + val server = getServer(servers, point) ?: continue + server.schedulingLatency = (point.sum / point.count).roundToLong() + } + } + + // SimHost + "system.guests" -> { + val agg = getHost(hosts, resource) ?: continue + + for (point in metric.longSumData.points) { + when (point.attributes[STATE_KEY]) { + "terminated" -> agg.guestsTerminated = point.value.toInt() + "running" -> agg.guestsRunning = point.value.toInt() + "error" -> agg.guestsRunning = point.value.toInt() + "invalid" -> agg.guestsInvalid = point.value.toInt() + } + } + } + "system.cpu.limit" -> { + val agg = getHost(hosts, resource) ?: continue + + for (point in metric.doubleGaugeData.points) { + val server = getServer(servers, point) + + if (server != null) { + server.cpuLimit = point.value + server.host = agg.host + } else { + agg.cpuLimit = point.value + } + } + } + "system.cpu.usage" -> { + val agg = getHost(hosts, resource) ?: continue + agg.cpuUsage = metric.doubleGaugeData.points.first().value + } + "system.cpu.demand" -> { + val agg = getHost(hosts, resource) ?: continue + agg.cpuDemand = metric.doubleGaugeData.points.first().value + } + "system.cpu.utilization" -> { + val agg = getHost(hosts, resource) ?: continue + agg.cpuUtilization = metric.doubleGaugeData.points.first().value + } + "system.cpu.time" -> { + val agg = getHost(hosts, resource) ?: continue + + for (point in metric.longSumData.points) { + val server = getServer(servers, point) + val state = point.attributes[STATE_KEY] + if (server != null) { + when (state) { + "active" -> server.cpuActiveTime = point.value + "idle" -> server.cpuIdleTime = point.value + "steal" -> server.cpuStealTime = point.value + "lost" -> server.cpuLostTime = point.value + } + server.host = agg.host + } else { + when (state) { + "active" -> agg.cpuActiveTime = point.value + "idle" -> agg.cpuIdleTime = point.value + "steal" -> agg.cpuStealTime = point.value + "lost" -> agg.cpuLostTime = point.value + } + } + } + } + "system.power.usage" -> { + val agg = getHost(hosts, resource) ?: continue + agg.powerUsage = metric.doubleGaugeData.points.first().value + } + "system.power.total" -> { + val agg = getHost(hosts, resource) ?: continue + agg.powerTotal = metric.doubleSumData.points.first().value + } + "system.time" -> { + val agg = getHost(hosts, resource) ?: continue + + for (point in metric.longSumData.points) { + val server = getServer(servers, point) + + if (server != null) { + when (point.attributes[STATE_KEY]) { + "up" -> server.uptime = point.value + "down" -> server.downtime = point.value + } + server.host = agg.host + } else { + when (point.attributes[STATE_KEY]) { + "up" -> agg.uptime = point.value + "down" -> agg.downtime = point.value + } + } + } + } + "system.time.boot" -> { + val agg = getHost(hosts, resource) ?: continue + + for (point in metric.longGaugeData.points) { + val server = getServer(servers, point) + + if (server != null) { + server.bootTime = point.value + server.host = agg.host + } else { + agg.bootTime = point.value + } + } + } + } + } + } + + /** + * Collect the data via the [monitor]. + */ + public fun collect(now: Instant, monitor: ComputeMonitor) { + monitor.record(_service.collect(now)) + + for (host in _hosts.values) { + monitor.record(host.collect(now)) + } + + for (server in _servers.values) { + monitor.record(server.collect(now)) + } + } + + /** + * Obtain the [HostAggregator] for the specified [resource]. + */ + private fun getHost(hosts: MutableMap<String, HostAggregator>, resource: Resource): HostAggregator? { + val id = resource.attributes[HOST_ID] + return if (id != null) { + hosts.computeIfAbsent(id) { HostAggregator(resource) } + } else { + null + } + } + + /** + * Obtain the [ServerAggregator] for the specified [point]. + */ + private fun getServer(servers: MutableMap<String, ServerAggregator>, point: PointData): ServerAggregator? { + val id = point.attributes[ResourceAttributes.HOST_ID] + return if (id != null) { + servers.computeIfAbsent(id) { ServerAggregator(point.attributes) } + } else { + null + } + } + + /** + * An aggregator for service metrics before they are reported. + */ + internal class ServiceAggregator { + @JvmField var hostsUp = 0 + @JvmField var hostsDown = 0 + + @JvmField var serversPending = 0 + @JvmField var serversActive = 0 + + @JvmField var attemptsSuccess = 0 + @JvmField var attemptsFailure = 0 + @JvmField var attemptsError = 0 + + /** + * Finish the aggregation for this cycle. + */ + fun collect(now: Instant): ServiceData = toServiceData(now) + + /** + * Convert the aggregator state to an immutable [ServiceData]. + */ + private fun toServiceData(now: Instant): ServiceData { + return ServiceData(now, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError) + } + } + + /** + * An aggregator for host metrics before they are reported. + */ + internal class HostAggregator(resource: Resource) { + /** + * The static information about this host. + */ + val host = HostInfo( + resource.attributes[HOST_ID]!!, + resource.attributes[HOST_NAME] ?: "", + resource.attributes[HOST_ARCH] ?: "", + resource.attributes[HOST_NCPUS]?.toInt() ?: 0, + resource.attributes[HOST_MEM_CAPACITY] ?: 0, + ) + + @JvmField var guestsTerminated = 0 + @JvmField var guestsRunning = 0 + @JvmField var guestsError = 0 + @JvmField var guestsInvalid = 0 + + @JvmField var cpuLimit = 0.0 + @JvmField var cpuUsage = 0.0 + @JvmField var cpuDemand = 0.0 + @JvmField var cpuUtilization = 0.0 + + @JvmField var cpuActiveTime = 0L + @JvmField var cpuIdleTime = 0L + @JvmField var cpuStealTime = 0L + @JvmField var cpuLostTime = 0L + private var previousCpuActiveTime = 0L + private var previousCpuIdleTime = 0L + private var previousCpuStealTime = 0L + private var previousCpuLostTime = 0L + + @JvmField var powerUsage = 0.0 + @JvmField var powerTotal = 0.0 + private var previousPowerTotal = 0.0 + + @JvmField var uptime = 0L + private var previousUptime = 0L + @JvmField var downtime = 0L + private var previousDowntime = 0L + @JvmField var bootTime = Long.MIN_VALUE + + /** + * Finish the aggregation for this cycle. + */ + fun collect(now: Instant): HostData { + val data = toHostData(now) + + // Reset intermediate state for next aggregation + previousCpuActiveTime = cpuActiveTime + previousCpuIdleTime = cpuIdleTime + previousCpuStealTime = cpuStealTime + previousCpuLostTime = cpuLostTime + previousPowerTotal = powerTotal + previousUptime = uptime + previousDowntime = downtime + + guestsTerminated = 0 + guestsRunning = 0 + guestsError = 0 + guestsInvalid = 0 + + cpuLimit = 0.0 + cpuUsage = 0.0 + cpuDemand = 0.0 + cpuUtilization = 0.0 + + powerUsage = 0.0 + + return data + } + + /** + * Convert the aggregator state to an immutable [HostData] instance. + */ + private fun toHostData(now: Instant): HostData { + return HostData( + now, + host, + guestsTerminated, + guestsRunning, + guestsError, + guestsInvalid, + cpuLimit, + cpuUsage, + cpuDemand, + cpuUtilization, + cpuActiveTime - previousCpuActiveTime, + cpuIdleTime - previousCpuIdleTime, + cpuStealTime - previousCpuStealTime, + cpuLostTime - previousCpuLostTime, + powerUsage, + powerTotal - previousPowerTotal, + uptime - previousUptime, + downtime - previousDowntime, + if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null + ) + } + } + + /** + * An aggregator for server metrics before they are reported. + */ + internal class ServerAggregator(attributes: Attributes) { + /** + * The static information about this server. + */ + val server = ServerInfo( + attributes[ResourceAttributes.HOST_ID]!!, + attributes[ResourceAttributes.HOST_NAME]!!, + attributes[ResourceAttributes.HOST_TYPE]!!, + attributes[ResourceAttributes.HOST_ARCH]!!, + attributes[ResourceAttributes.HOST_IMAGE_ID]!!, + attributes[ResourceAttributes.HOST_IMAGE_NAME]!!, + attributes[AttributeKey.longKey("host.num_cpus")]!!.toInt(), + attributes[AttributeKey.longKey("host.mem_capacity")]!!, + ) + + /** + * The [HostInfo] of the host on which the server is hosted. + */ + var host: HostInfo? = null + + @JvmField var uptime: Long = 0 + private var previousUptime = 0L + @JvmField var downtime: Long = 0 + private var previousDowntime = 0L + @JvmField var bootTime: Long = 0 + @JvmField var schedulingLatency = 0L + @JvmField var cpuLimit = 0.0 + @JvmField var cpuActiveTime = 0L + @JvmField var cpuIdleTime = 0L + @JvmField var cpuStealTime = 0L + @JvmField var cpuLostTime = 0L + private var previousCpuActiveTime = 0L + private var previousCpuIdleTime = 0L + private var previousCpuStealTime = 0L + private var previousCpuLostTime = 0L + + /** + * Finish the aggregation for this cycle. + */ + fun collect(now: Instant): ServerData { + val data = toServerData(now) + + previousUptime = uptime + previousDowntime = downtime + previousCpuActiveTime = cpuActiveTime + previousCpuIdleTime = cpuIdleTime + previousCpuStealTime = cpuStealTime + previousCpuLostTime = cpuLostTime + + host = null + cpuLimit = 0.0 + + return data + } + + /** + * Convert the aggregator state into an immutable [ServerData]. + */ + private fun toServerData(now: Instant): ServerData { + return ServerData( + now, + server, + host, + uptime - previousUptime, + downtime - previousDowntime, + if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null, + schedulingLatency, + cpuLimit, + cpuActiveTime - previousCpuActiveTime, + cpuIdleTime - previousCpuIdleTime, + cpuStealTime - previousCpuStealTime, + cpuLostTime - previousCpuLostTime + ) + } + } + + private companion object { + private val STATE_KEY = AttributeKey.stringKey("state") + private val RESULT_KEY = AttributeKey.stringKey("result") + } +} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt index 408d1325..ea96f721 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt @@ -22,28 +22,24 @@ package org.opendc.telemetry.compute -import io.opentelemetry.api.common.AttributeKey -import io.opentelemetry.api.common.Attributes import io.opentelemetry.sdk.common.CompletableResultCode import io.opentelemetry.sdk.metrics.data.* import io.opentelemetry.sdk.metrics.export.MetricExporter -import io.opentelemetry.sdk.resources.Resource -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes -import org.opendc.telemetry.compute.table.HostData -import org.opendc.telemetry.compute.table.HostInfo -import org.opendc.telemetry.compute.table.ServerData -import org.opendc.telemetry.compute.table.ServerInfo import java.time.Clock /** * A [MetricExporter] that redirects data to a [ComputeMonitor] implementation. */ public class ComputeMetricExporter(private val clock: Clock, private val monitor: ComputeMonitor) : MetricExporter { + /** + * A [ComputeMetricAggregator] that actually performs the aggregation. + */ + private val agg = ComputeMetricAggregator() + override fun export(metrics: Collection<MetricData>): CompletableResultCode { return try { - reportServiceMetrics(metrics) - reportHostMetrics(metrics) - reportServerMetrics(metrics) + agg.process(metrics) + agg.collect(clock.instant(), monitor) CompletableResultCode.ofSuccess() } catch (e: Throwable) { CompletableResultCode.ofFailure() @@ -53,229 +49,4 @@ public class ComputeMetricExporter(private val clock: Clock, private val monitor override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() - - private fun reportServiceMetrics(metrics: Collection<MetricData>) { - monitor.record(extractServiceMetrics(clock.millis(), metrics)) - } - - private val hosts = mutableMapOf<String, HostAggregator>() - private val servers = mutableMapOf<String, ServerAggregator>() - - private fun reportHostMetrics(metrics: Collection<MetricData>) { - val hosts = hosts - val servers = servers - - for (metric in metrics) { - val resource = metric.resource - val hostId = resource.attributes[HOST_ID] ?: continue - val agg = hosts.computeIfAbsent(hostId) { HostAggregator(resource) } - agg.accept(metric) - } - - val monitor = monitor - val now = clock.millis() - for ((_, server) in servers) { - server.record(monitor, now) - } - } - - private fun reportServerMetrics(metrics: Collection<MetricData>) { - val hosts = hosts - - for (metric in metrics) { - val resource = metric.resource - val host = resource.attributes[HOST_ID]?.let { hosts[it]?.host } - - when (metric.name) { - "scheduler.duration" -> mapByServer(metric.doubleHistogramData.points, host) { agg, point -> - agg.schedulingLatency = point.sum / point.count - } - "guest.time.running" -> mapByServer(metric.longSumData.points, host) { agg, point -> - agg.uptime = point.value - } - "guest.time.error" -> mapByServer(metric.longSumData.points, host) { agg, point -> - agg.downtime = point.value - } - } - } - - val monitor = monitor - val now = clock.millis() - for ((_, host) in hosts) { - host.record(monitor, now) - } - } - - /** - * Helper function to map a metric by the server. - */ - private inline fun <P : PointData> mapByServer(points: Collection<P>, host: HostInfo? = null, block: (ServerAggregator, P) -> Unit) { - for (point in points) { - val serverId = point.attributes[ResourceAttributes.HOST_ID] ?: continue - val agg = servers.computeIfAbsent(serverId) { ServerAggregator(point.attributes) } - - if (host != null) { - agg.host = host - } - - block(agg, point) - } - } - - /** - * An aggregator for host metrics before they are reported. - */ - private class HostAggregator(resource: Resource) { - /** - * The static information about this host. - */ - val host = HostInfo( - resource.attributes[HOST_ID]!!, - resource.attributes[HOST_NAME]!!, - resource.attributes[HOST_ARCH]!!, - resource.attributes[HOST_NCPUS]!!.toInt(), - resource.attributes[HOST_MEM_CAPACITY]!!, - ) - - private var totalWork: Double = 0.0 - private var previousTotalWork = 0.0 - private var grantedWork: Double = 0.0 - private var previousGrantedWork = 0.0 - private var overcommittedWork: Double = 0.0 - private var previousOvercommittedWork = 0.0 - private var interferedWork: Double = 0.0 - private var previousInterferedWork = 0.0 - private var cpuUsage: Double = 0.0 - private var cpuDemand: Double = 0.0 - private var instanceCount: Int = 0 - private var powerDraw: Double = 0.0 - private var uptime: Long = 0 - private var previousUptime = 0L - private var downtime: Long = 0 - private var previousDowntime = 0L - - fun record(monitor: ComputeMonitor, now: Long) { - monitor.record( - HostData( - now, - host, - totalWork - previousTotalWork, - grantedWork - previousGrantedWork, - overcommittedWork - previousOvercommittedWork, - interferedWork - previousInterferedWork, - cpuUsage, - cpuDemand, - instanceCount, - powerDraw, - uptime - previousUptime, - downtime - previousDowntime, - ) - ) - - previousTotalWork = totalWork - previousGrantedWork = grantedWork - previousOvercommittedWork = overcommittedWork - previousInterferedWork = interferedWork - previousUptime = uptime - previousDowntime = downtime - reset() - } - - /** - * Accept the [MetricData] for this host. - */ - fun accept(data: MetricData) { - when (data.name) { - "cpu.work.total" -> totalWork = data.doubleSumData.points.first().value - "cpu.work.granted" -> grantedWork = data.doubleSumData.points.first().value - "cpu.work.overcommit" -> overcommittedWork = data.doubleSumData.points.first().value - "cpu.work.interference" -> interferedWork = data.doubleSumData.points.first().value - "power.usage" -> powerDraw = acceptHistogram(data) - "cpu.usage" -> cpuUsage = acceptHistogram(data) - "cpu.demand" -> cpuDemand = acceptHistogram(data) - "guests.active" -> instanceCount = data.longSumData.points.first().value.toInt() - "host.time.up" -> uptime = data.longSumData.points.first().value - "host.time.down" -> downtime = data.longSumData.points.first().value - } - } - - private fun acceptHistogram(data: MetricData): Double { - return when (data.type) { - MetricDataType.HISTOGRAM -> { - val point = data.doubleHistogramData.points.first() - point.sum / point.count - } - MetricDataType.SUMMARY -> { - val point = data.doubleSummaryData.points.first() - point.sum / point.count - } - else -> error("Invalid metric type") - } - } - - private fun reset() { - totalWork = 0.0 - grantedWork = 0.0 - overcommittedWork = 0.0 - interferedWork = 0.0 - cpuUsage = 0.0 - cpuDemand = 0.0 - instanceCount = 0 - powerDraw = 0.0 - uptime = 0L - downtime = 0L - } - } - - /** - * An aggregator for server metrics before they are reported. - */ - private class ServerAggregator(attributes: Attributes) { - /** - * The static information about this server. - */ - val server = ServerInfo( - attributes[ResourceAttributes.HOST_ID]!!, - attributes[ResourceAttributes.HOST_NAME]!!, - attributes[ResourceAttributes.HOST_TYPE]!!, - attributes[ResourceAttributes.HOST_ARCH]!!, - attributes[ResourceAttributes.HOST_IMAGE_ID]!!, - attributes[ResourceAttributes.HOST_IMAGE_NAME]!!, - attributes[AttributeKey.longKey("host.num_cpus")]!!.toInt(), - attributes[AttributeKey.longKey("host.mem_capacity")]!!, - ) - - /** - * The [HostInfo] of the host on which the server is hosted. - */ - var host: HostInfo? = null - - @JvmField var uptime: Long = 0 - private var previousUptime = 0L - @JvmField var downtime: Long = 0 - private var previousDowntime = 0L - @JvmField var schedulingLatency = 0.0 - - fun record(monitor: ComputeMonitor, now: Long) { - monitor.record( - ServerData( - now, - server, - null, - uptime - previousUptime, - downtime - previousDowntime, - ) - ) - - previousUptime = uptime - previousDowntime = downtime - reset() - } - - private fun reset() { - host = null - uptime = 0L - downtime = 0L - } - } } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt index f3690ee8..25d346fb 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt @@ -22,64 +22,31 @@ package org.opendc.telemetry.compute -import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricProducer import org.opendc.telemetry.compute.table.ServiceData +import java.time.Instant /** * Collect the metrics of the compute service. */ -public fun collectServiceMetrics(timestamp: Long, metricProducer: MetricProducer): ServiceData { +public fun collectServiceMetrics(timestamp: Instant, metricProducer: MetricProducer): ServiceData { return extractServiceMetrics(timestamp, metricProducer.collectAllMetrics()) } /** * Extract a [ServiceData] object from the specified list of metric data. */ -public fun extractServiceMetrics(timestamp: Long, metrics: Collection<MetricData>): ServiceData { - val resultKey = AttributeKey.stringKey("result") - val stateKey = AttributeKey.stringKey("state") - - var hostsUp = 0 - var hostsDown = 0 - - var serversPending = 0 - var serversActive = 0 - - var attemptsSuccess = 0 - var attemptsFailure = 0 - var attemptsError = 0 - - for (metric in metrics) { - when (metric.name) { - "scheduler.hosts" -> { - for (point in metric.longSumData.points) { - when (point.attributes[stateKey]) { - "up" -> hostsUp = point.value.toInt() - "down" -> hostsDown = point.value.toInt() - } - } - } - "scheduler.servers" -> { - for (point in metric.longSumData.points) { - when (point.attributes[stateKey]) { - "pending" -> serversPending = point.value.toInt() - "active" -> serversActive = point.value.toInt() - } - } - } - "scheduler.attempts" -> { - for (point in metric.longSumData.points) { - when (point.attributes[resultKey]) { - "success" -> attemptsSuccess = point.value.toInt() - "failure" -> attemptsFailure = point.value.toInt() - "error" -> attemptsError = point.value.toInt() - } - } - } +public fun extractServiceMetrics(timestamp: Instant, metrics: Collection<MetricData>): ServiceData { + lateinit var serviceData: ServiceData + val agg = ComputeMetricAggregator() + val monitor = object : ComputeMonitor { + override fun record(data: ServiceData) { + serviceData = data } } - return ServiceData(timestamp, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError) + agg.process(metrics) + agg.collect(timestamp, monitor) + return serviceData } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt index e3ecda3d..8e787b97 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt @@ -22,20 +22,29 @@ package org.opendc.telemetry.compute.table +import java.time.Instant + /** * A trace entry for a particular host. */ public data class HostData( - public val timestamp: Long, - public val host: HostInfo, - public val totalWork: Double, - public val grantedWork: Double, - public val overcommittedWork: Double, - public val interferedWork: Double, - public val cpuUsage: Double, - public val cpuDemand: Double, - public val instanceCount: Int, - public val powerDraw: Double, - public val uptime: Long, - public val downtime: Long, + val timestamp: Instant, + val host: HostInfo, + val guestsTerminated: Int, + val guestsRunning: Int, + val guestsError: Int, + val guestsInvalid: Int, + val cpuLimit: Double, + val cpuUsage: Double, + val cpuDemand: Double, + val cpuUtilization: Double, + val cpuActiveTime: Long, + val cpuIdleTime: Long, + val cpuStealTime: Long, + val cpuLostTime: Long, + val powerUsage: Double, + val powerTotal: Double, + val uptime: Long, + val downtime: Long, + val bootTime: Instant? ) diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt index 7fde86d9..c48bff3a 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt @@ -22,13 +22,22 @@ package org.opendc.telemetry.compute.table +import java.time.Instant + /** * A trace entry for a particular server. */ public data class ServerData( - public val timestamp: Long, - public val server: ServerInfo, - public val host: HostInfo?, - public val uptime: Long, - public val downtime: Long, + val timestamp: Instant, + val server: ServerInfo, + val host: HostInfo?, + val uptime: Long, + val downtime: Long, + val bootTime: Instant?, + val schedulingLatency: Long, + val cpuLimit: Double, + val cpuActiveTime: Long, + val cpuIdleTime: Long, + val cpuStealTime: Long, + val cpuLostTime: Long, ) diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt index da2ebdf4..6db1399d 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt @@ -22,16 +22,18 @@ package org.opendc.telemetry.compute.table +import java.time.Instant + /** * A trace entry for the compute service. */ public data class ServiceData( - public val timestamp: Long, - public val hostsUp: Int, - public val hostsDown: Int, - public val serversPending: Int, - public val serversActive: Int, - public val attemptsSuccess: Int, - public val attemptsFailure: Int, - public val attemptsError: Int + val timestamp: Instant, + val hostsUp: Int, + val hostsDown: Int, + val serversPending: Int, + val serversActive: Int, + val attemptsSuccess: Int, + val attemptsFailure: Int, + val attemptsError: Int ) diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt index 8f19ab81..07f0ff7f 100644 --- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt +++ b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt @@ -44,7 +44,7 @@ public class CoroutineMetricReader( scope: CoroutineScope, private val producers: List<MetricProducer>, private val exporter: MetricExporter, - private val exportInterval: Duration = Duration.ofMinutes(1) + private val exportInterval: Duration = Duration.ofMinutes(5) ) : AutoCloseable { private val logger = KotlinLogging.logger {} |
