diff options
Diffstat (limited to 'opendc-telemetry')
12 files changed, 643 insertions, 251 deletions
diff --git a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts index 6a3de9bc..cd8cb57a 100644 --- a/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts +++ b/opendc-telemetry/opendc-telemetry-compute/build.gradle.kts @@ -31,7 +31,6 @@ dependencies { api(platform(projects.opendcPlatform)) api(projects.opendcTelemetry.opendcTelemetrySdk) - implementation(projects.opendcCompute.opendcComputeSimulator) implementation(libs.opentelemetry.semconv) implementation(libs.kotlin.logging) } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt new file mode 100644 index 00000000..e9449634 --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricAggregator.kt @@ -0,0 +1,448 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute + +import io.opentelemetry.api.common.AttributeKey +import io.opentelemetry.api.common.Attributes +import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.data.PointData +import io.opentelemetry.sdk.resources.Resource +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes +import org.opendc.telemetry.compute.table.* +import java.time.Instant +import kotlin.math.roundToLong + +/** + * Helper class responsible for aggregating [MetricData] into [ServiceData], [HostData] and [ServerData]. + */ +public class ComputeMetricAggregator { + private val _service = ServiceAggregator() + private val _hosts = mutableMapOf<String, HostAggregator>() + private val _servers = mutableMapOf<String, ServerAggregator>() + + /** + * Process the specified [metrics] for this cycle. + */ + public fun process(metrics: Collection<MetricData>) { + val service = _service + val hosts = _hosts + val servers = _servers + + for (metric in metrics) { + val resource = metric.resource + + when (metric.name) { + // ComputeService + "scheduler.hosts" -> { + for (point in metric.longSumData.points) { + when (point.attributes[STATE_KEY]) { + "up" -> service.hostsUp = point.value.toInt() + "down" -> service.hostsDown = point.value.toInt() + } + } + } + "scheduler.servers" -> { + for (point in metric.longSumData.points) { + when (point.attributes[STATE_KEY]) { + "pending" -> service.serversPending = point.value.toInt() + "active" -> service.serversActive = point.value.toInt() + } + } + } + "scheduler.attempts" -> { + for (point in metric.longSumData.points) { + when (point.attributes[RESULT_KEY]) { + "success" -> service.attemptsSuccess = point.value.toInt() + "failure" -> service.attemptsFailure = point.value.toInt() + "error" -> service.attemptsError = point.value.toInt() + } + } + } + "scheduler.latency" -> { + for (point in metric.doubleHistogramData.points) { + val server = getServer(servers, point) ?: continue + server.schedulingLatency = (point.sum / point.count).roundToLong() + } + } + + // SimHost + "system.guests" -> { + val agg = getHost(hosts, resource) ?: continue + + for (point in metric.longSumData.points) { + when (point.attributes[STATE_KEY]) { + "terminated" -> agg.guestsTerminated = point.value.toInt() + "running" -> agg.guestsRunning = point.value.toInt() + "error" -> agg.guestsRunning = point.value.toInt() + "invalid" -> agg.guestsInvalid = point.value.toInt() + } + } + } + "system.cpu.limit" -> { + val agg = getHost(hosts, resource) ?: continue + + for (point in metric.doubleGaugeData.points) { + val server = getServer(servers, point) + + if (server != null) { + server.cpuLimit = point.value + server.host = agg.host + } else { + agg.cpuLimit = point.value + } + } + } + "system.cpu.usage" -> { + val agg = getHost(hosts, resource) ?: continue + agg.cpuUsage = metric.doubleGaugeData.points.first().value + } + "system.cpu.demand" -> { + val agg = getHost(hosts, resource) ?: continue + agg.cpuDemand = metric.doubleGaugeData.points.first().value + } + "system.cpu.utilization" -> { + val agg = getHost(hosts, resource) ?: continue + agg.cpuUtilization = metric.doubleGaugeData.points.first().value + } + "system.cpu.time" -> { + val agg = getHost(hosts, resource) ?: continue + + for (point in metric.longSumData.points) { + val server = getServer(servers, point) + val state = point.attributes[STATE_KEY] + if (server != null) { + when (state) { + "active" -> server.cpuActiveTime = point.value + "idle" -> server.cpuIdleTime = point.value + "steal" -> server.cpuStealTime = point.value + "lost" -> server.cpuLostTime = point.value + } + server.host = agg.host + } else { + when (state) { + "active" -> agg.cpuActiveTime = point.value + "idle" -> agg.cpuIdleTime = point.value + "steal" -> agg.cpuStealTime = point.value + "lost" -> agg.cpuLostTime = point.value + } + } + } + } + "system.power.usage" -> { + val agg = getHost(hosts, resource) ?: continue + agg.powerUsage = metric.doubleGaugeData.points.first().value + } + "system.power.total" -> { + val agg = getHost(hosts, resource) ?: continue + agg.powerTotal = metric.doubleSumData.points.first().value + } + "system.time" -> { + val agg = getHost(hosts, resource) ?: continue + + for (point in metric.longSumData.points) { + val server = getServer(servers, point) + + if (server != null) { + when (point.attributes[STATE_KEY]) { + "up" -> server.uptime = point.value + "down" -> server.downtime = point.value + } + server.host = agg.host + } else { + when (point.attributes[STATE_KEY]) { + "up" -> agg.uptime = point.value + "down" -> agg.downtime = point.value + } + } + } + } + "system.time.boot" -> { + val agg = getHost(hosts, resource) ?: continue + + for (point in metric.longGaugeData.points) { + val server = getServer(servers, point) + + if (server != null) { + server.bootTime = point.value + server.host = agg.host + } else { + agg.bootTime = point.value + } + } + } + } + } + } + + /** + * Collect the data via the [monitor]. + */ + public fun collect(now: Instant, monitor: ComputeMonitor) { + monitor.record(_service.collect(now)) + + for (host in _hosts.values) { + monitor.record(host.collect(now)) + } + + for (server in _servers.values) { + monitor.record(server.collect(now)) + } + } + + /** + * Obtain the [HostAggregator] for the specified [resource]. + */ + private fun getHost(hosts: MutableMap<String, HostAggregator>, resource: Resource): HostAggregator? { + val id = resource.attributes[HOST_ID] + return if (id != null) { + hosts.computeIfAbsent(id) { HostAggregator(resource) } + } else { + null + } + } + + /** + * Obtain the [ServerAggregator] for the specified [point]. + */ + private fun getServer(servers: MutableMap<String, ServerAggregator>, point: PointData): ServerAggregator? { + val id = point.attributes[ResourceAttributes.HOST_ID] + return if (id != null) { + servers.computeIfAbsent(id) { ServerAggregator(point.attributes) } + } else { + null + } + } + + /** + * An aggregator for service metrics before they are reported. + */ + internal class ServiceAggregator { + @JvmField var hostsUp = 0 + @JvmField var hostsDown = 0 + + @JvmField var serversPending = 0 + @JvmField var serversActive = 0 + + @JvmField var attemptsSuccess = 0 + @JvmField var attemptsFailure = 0 + @JvmField var attemptsError = 0 + + /** + * Finish the aggregation for this cycle. + */ + fun collect(now: Instant): ServiceData = toServiceData(now) + + /** + * Convert the aggregator state to an immutable [ServiceData]. + */ + private fun toServiceData(now: Instant): ServiceData { + return ServiceData(now, hostsUp, hostsDown, serversPending, serversActive, attemptsSuccess, attemptsFailure, attemptsError) + } + } + + /** + * An aggregator for host metrics before they are reported. + */ + internal class HostAggregator(resource: Resource) { + /** + * The static information about this host. + */ + val host = HostInfo( + resource.attributes[HOST_ID]!!, + resource.attributes[HOST_NAME] ?: "", + resource.attributes[HOST_ARCH] ?: "", + resource.attributes[HOST_NCPUS]?.toInt() ?: 0, + resource.attributes[HOST_MEM_CAPACITY] ?: 0, + ) + + @JvmField var guestsTerminated = 0 + @JvmField var guestsRunning = 0 + @JvmField var guestsError = 0 + @JvmField var guestsInvalid = 0 + + @JvmField var cpuLimit = 0.0 + @JvmField var cpuUsage = 0.0 + @JvmField var cpuDemand = 0.0 + @JvmField var cpuUtilization = 0.0 + + @JvmField var cpuActiveTime = 0L + @JvmField var cpuIdleTime = 0L + @JvmField var cpuStealTime = 0L + @JvmField var cpuLostTime = 0L + private var previousCpuActiveTime = 0L + private var previousCpuIdleTime = 0L + private var previousCpuStealTime = 0L + private var previousCpuLostTime = 0L + + @JvmField var powerUsage = 0.0 + @JvmField var powerTotal = 0.0 + private var previousPowerTotal = 0.0 + + @JvmField var uptime = 0L + private var previousUptime = 0L + @JvmField var downtime = 0L + private var previousDowntime = 0L + @JvmField var bootTime = Long.MIN_VALUE + + /** + * Finish the aggregation for this cycle. + */ + fun collect(now: Instant): HostData { + val data = toHostData(now) + + // Reset intermediate state for next aggregation + previousCpuActiveTime = cpuActiveTime + previousCpuIdleTime = cpuIdleTime + previousCpuStealTime = cpuStealTime + previousCpuLostTime = cpuLostTime + previousPowerTotal = powerTotal + previousUptime = uptime + previousDowntime = downtime + + guestsTerminated = 0 + guestsRunning = 0 + guestsError = 0 + guestsInvalid = 0 + + cpuLimit = 0.0 + cpuUsage = 0.0 + cpuDemand = 0.0 + cpuUtilization = 0.0 + + powerUsage = 0.0 + + return data + } + + /** + * Convert the aggregator state to an immutable [HostData] instance. + */ + private fun toHostData(now: Instant): HostData { + return HostData( + now, + host, + guestsTerminated, + guestsRunning, + guestsError, + guestsInvalid, + cpuLimit, + cpuUsage, + cpuDemand, + cpuUtilization, + cpuActiveTime - previousCpuActiveTime, + cpuIdleTime - previousCpuIdleTime, + cpuStealTime - previousCpuStealTime, + cpuLostTime - previousCpuLostTime, + powerUsage, + powerTotal - previousPowerTotal, + uptime - previousUptime, + downtime - previousDowntime, + if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null + ) + } + } + + /** + * An aggregator for server metrics before they are reported. + */ + internal class ServerAggregator(attributes: Attributes) { + /** + * The static information about this server. + */ + val server = ServerInfo( + attributes[ResourceAttributes.HOST_ID]!!, + attributes[ResourceAttributes.HOST_NAME]!!, + attributes[ResourceAttributes.HOST_TYPE]!!, + attributes[ResourceAttributes.HOST_ARCH]!!, + attributes[ResourceAttributes.HOST_IMAGE_ID]!!, + attributes[ResourceAttributes.HOST_IMAGE_NAME]!!, + attributes[AttributeKey.longKey("host.num_cpus")]!!.toInt(), + attributes[AttributeKey.longKey("host.mem_capacity")]!!, + ) + + /** + * The [HostInfo] of the host on which the server is hosted. + */ + var host: HostInfo? = null + + @JvmField var uptime: Long = 0 + private var previousUptime = 0L + @JvmField var downtime: Long = 0 + private var previousDowntime = 0L + @JvmField var bootTime: Long = 0 + @JvmField var schedulingLatency = 0L + @JvmField var cpuLimit = 0.0 + @JvmField var cpuActiveTime = 0L + @JvmField var cpuIdleTime = 0L + @JvmField var cpuStealTime = 0L + @JvmField var cpuLostTime = 0L + private var previousCpuActiveTime = 0L + private var previousCpuIdleTime = 0L + private var previousCpuStealTime = 0L + private var previousCpuLostTime = 0L + + /** + * Finish the aggregation for this cycle. + */ + fun collect(now: Instant): ServerData { + val data = toServerData(now) + + previousUptime = uptime + previousDowntime = downtime + previousCpuActiveTime = cpuActiveTime + previousCpuIdleTime = cpuIdleTime + previousCpuStealTime = cpuStealTime + previousCpuLostTime = cpuLostTime + + host = null + cpuLimit = 0.0 + + return data + } + + /** + * Convert the aggregator state into an immutable [ServerData]. + */ + private fun toServerData(now: Instant): ServerData { + return ServerData( + now, + server, + host, + uptime - previousUptime, + downtime - previousDowntime, + if (bootTime != Long.MIN_VALUE) Instant.ofEpochMilli(bootTime) else null, + schedulingLatency, + cpuLimit, + cpuActiveTime - previousCpuActiveTime, + cpuIdleTime - previousCpuIdleTime, + cpuStealTime - previousCpuStealTime, + cpuLostTime - previousCpuLostTime + ) + } + } + + private companion object { + private val STATE_KEY = AttributeKey.stringKey("state") + private val RESULT_KEY = AttributeKey.stringKey("result") + } +} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt index 95e7ff9e..ea96f721 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMetricExporter.kt @@ -23,126 +23,29 @@ package org.opendc.telemetry.compute import io.opentelemetry.sdk.common.CompletableResultCode -import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.metrics.data.* import io.opentelemetry.sdk.metrics.export.MetricExporter -import io.opentelemetry.semconv.resource.attributes.ResourceAttributes -import org.opendc.compute.service.driver.Host -import org.opendc.telemetry.compute.table.HostData import java.time.Clock /** * A [MetricExporter] that redirects data to a [ComputeMonitor] implementation. */ -public class ComputeMetricExporter( - private val clock: Clock, - private val hosts: Map<String, Host>, - private val monitor: ComputeMonitor -) : MetricExporter { +public class ComputeMetricExporter(private val clock: Clock, private val monitor: ComputeMonitor) : MetricExporter { + /** + * A [ComputeMetricAggregator] that actually performs the aggregation. + */ + private val agg = ComputeMetricAggregator() override fun export(metrics: Collection<MetricData>): CompletableResultCode { return try { - reportHostMetrics(metrics) - reportServiceMetrics(metrics) + agg.process(metrics) + agg.collect(clock.instant(), monitor) CompletableResultCode.ofSuccess() } catch (e: Throwable) { CompletableResultCode.ofFailure() } } - private var lastHostMetrics: Map<String, HBuffer> = emptyMap() - private val hostMetricsSingleton = HBuffer() - - private fun reportHostMetrics(metrics: Collection<MetricData>) { - val hostMetrics = mutableMapOf<String, HBuffer>() - - for (metric in metrics) { - when (metric.name) { - "cpu.demand" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuDemand = v } - "cpu.usage" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.cpuUsage = v } - "power.usage" -> mapDoubleSummary(metric, hostMetrics) { m, v -> m.powerDraw = v } - "cpu.work.total" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.totalWork = v } - "cpu.work.granted" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.grantedWork = v } - "cpu.work.overcommit" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.overcommittedWork = v } - "cpu.work.interference" -> mapDoubleSum(metric, hostMetrics) { m, v -> m.interferedWork = v } - "guests.active" -> mapLongSum(metric, hostMetrics) { m, v -> m.instanceCount = v.toInt() } - "host.time.up" -> mapLongSum(metric, hostMetrics) { m, v -> m.uptime = v } - "host.time.down" -> mapLongSum(metric, hostMetrics) { m, v -> m.downtime = v } - } - } - - for ((id, hostMetric) in hostMetrics) { - val lastHostMetric = lastHostMetrics.getOrDefault(id, hostMetricsSingleton) - val host = hosts[id] ?: continue - - monitor.record( - HostData( - clock.millis(), - host, - hostMetric.totalWork - lastHostMetric.totalWork, - hostMetric.grantedWork - lastHostMetric.grantedWork, - hostMetric.overcommittedWork - lastHostMetric.overcommittedWork, - hostMetric.interferedWork - lastHostMetric.interferedWork, - hostMetric.cpuUsage, - hostMetric.cpuDemand, - hostMetric.instanceCount, - hostMetric.powerDraw, - hostMetric.uptime - lastHostMetric.uptime, - hostMetric.downtime - lastHostMetric.downtime, - ) - ) - } - - lastHostMetrics = hostMetrics - } - - private fun mapDoubleSummary(data: MetricData, hostMetrics: MutableMap<String, HBuffer>, block: (HBuffer, Double) -> Unit) { - val points = data.doubleSummaryData?.points ?: emptyList() - for (point in points) { - val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue - val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() } - val avg = (point.percentileValues[0].value + point.percentileValues[1].value) / 2 - block(hostMetric, avg) - } - } - - private fun mapLongSum(data: MetricData?, hostMetrics: MutableMap<String, HBuffer>, block: (HBuffer, Long) -> Unit) { - val points = data?.longSumData?.points ?: emptyList() - for (point in points) { - val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue - val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() } - block(hostMetric, point.value) - } - } - - private fun mapDoubleSum(data: MetricData?, hostMetrics: MutableMap<String, HBuffer>, block: (HBuffer, Double) -> Unit) { - val points = data?.doubleSumData?.points ?: emptyList() - for (point in points) { - val uid = point.attributes[ResourceAttributes.HOST_ID] ?: continue - val hostMetric = hostMetrics.computeIfAbsent(uid) { HBuffer() } - block(hostMetric, point.value) - } - } - - /** - * A buffer for host metrics before they are reported. - */ - private class HBuffer { - var totalWork: Double = 0.0 - var grantedWork: Double = 0.0 - var overcommittedWork: Double = 0.0 - var interferedWork: Double = 0.0 - var cpuUsage: Double = 0.0 - var cpuDemand: Double = 0.0 - var instanceCount: Int = 0 - var powerDraw: Double = 0.0 - var uptime: Long = 0 - var downtime: Long = 0 - } - - private fun reportServiceMetrics(metrics: Collection<MetricData>) { - monitor.record(extractServiceMetrics(clock.millis(), metrics)) - } - override fun flush(): CompletableResultCode = CompletableResultCode.ofSuccess() override fun shutdown(): CompletableResultCode = CompletableResultCode.ofSuccess() diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt index ec303b37..d51bcab4 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/ComputeMonitor.kt @@ -22,10 +22,6 @@ package org.opendc.telemetry.compute -import org.opendc.compute.api.Server -import org.opendc.compute.api.ServerState -import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostState import org.opendc.telemetry.compute.table.HostData import org.opendc.telemetry.compute.table.ServerData import org.opendc.telemetry.compute.table.ServiceData @@ -35,16 +31,6 @@ import org.opendc.telemetry.compute.table.ServiceData */ public interface ComputeMonitor { /** - * This method is invoked when the state of a [Server] changes. - */ - public fun onStateChange(timestamp: Long, server: Server, newState: ServerState) {} - - /** - * This method is invoked when the state of a [Host] changes. - */ - public fun onStateChange(time: Long, host: Host, newState: HostState) {} - - /** * Record the specified [data]. */ public fun record(data: ServerData) {} diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt index d3d983b9..25d346fb 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/Helpers.kt @@ -24,88 +24,29 @@ package org.opendc.telemetry.compute import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricProducer -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.coroutineScope -import org.opendc.compute.service.ComputeService -import org.opendc.compute.service.driver.Host -import org.opendc.compute.service.driver.HostListener -import org.opendc.compute.service.driver.HostState import org.opendc.telemetry.compute.table.ServiceData -import org.opendc.telemetry.sdk.metrics.export.CoroutineMetricReader -import java.time.Clock - -/** - * Attach the specified monitor to the OpenDC Compute service. - */ -public suspend fun withMonitor( - scheduler: ComputeService, - clock: Clock, - metricProducer: MetricProducer, - monitor: ComputeMonitor, - exportInterval: Long = 5L * 60 * 1000, /* Every 5 min (which is the granularity of the workload trace) */ - block: suspend CoroutineScope.() -> Unit -): Unit = coroutineScope { - // Monitor host events - for (host in scheduler.hosts) { - monitor.onStateChange(clock.millis(), host, HostState.UP) - host.addListener(object : HostListener { - override fun onStateChanged(host: Host, newState: HostState) { - monitor.onStateChange(clock.millis(), host, newState) - } - }) - } - - val reader = CoroutineMetricReader( - this, - listOf(metricProducer), - ComputeMetricExporter(clock, scheduler.hosts.associateBy { it.uid.toString() }, monitor), - exportInterval - ) - - try { - block(this) - } finally { - reader.close() - } -} +import java.time.Instant /** * Collect the metrics of the compute service. */ -public fun collectServiceMetrics(timestamp: Long, metricProducer: MetricProducer): ServiceData { +public fun collectServiceMetrics(timestamp: Instant, metricProducer: MetricProducer): ServiceData { return extractServiceMetrics(timestamp, metricProducer.collectAllMetrics()) } /** * Extract a [ServiceData] object from the specified list of metric data. */ -public fun extractServiceMetrics(timestamp: Long, metrics: Collection<MetricData>): ServiceData { - var submittedVms = 0 - var queuedVms = 0 - var unscheduledVms = 0 - var runningVms = 0 - var finishedVms = 0 - var hosts = 0 - var availableHosts = 0 - - for (metric in metrics) { - val points = metric.longSumData.points - - if (points.isEmpty()) { - continue - } - - val value = points.first().value.toInt() - when (metric.name) { - "servers.submitted" -> submittedVms = value - "servers.waiting" -> queuedVms = value - "servers.unscheduled" -> unscheduledVms = value - "servers.active" -> runningVms = value - "servers.finished" -> finishedVms = value - "hosts.total" -> hosts = value - "hosts.available" -> availableHosts = value +public fun extractServiceMetrics(timestamp: Instant, metrics: Collection<MetricData>): ServiceData { + lateinit var serviceData: ServiceData + val agg = ComputeMetricAggregator() + val monitor = object : ComputeMonitor { + override fun record(data: ServiceData) { + serviceData = data } } - return ServiceData(timestamp, hosts, availableHosts, submittedVms, runningVms, finishedVms, queuedVms, unscheduledVms) + agg.process(metrics) + agg.collect(timestamp, monitor) + return serviceData } diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt new file mode 100644 index 00000000..7dca6186 --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/HostAttributes.kt @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +@file:JvmName("HostAttributes") +package org.opendc.telemetry.compute + +import io.opentelemetry.api.common.AttributeKey + +/** + * The identifier of the node hosting virtual machines. + */ +public val HOST_ID: AttributeKey<String> = AttributeKey.stringKey("node.id") + +/** + * The name of the node hosting virtual machines. + */ +public val HOST_NAME: AttributeKey<String> = AttributeKey.stringKey("node.name") + +/** + * The CPU architecture of the host node. + */ +public val HOST_ARCH: AttributeKey<String> = AttributeKey.stringKey("node.arch") + +/** + * The number of CPUs in the host node. + */ +public val HOST_NCPUS: AttributeKey<Long> = AttributeKey.longKey("node.num_cpus") + +/** + * The amount of memory installed in the host node in MiB. + */ +public val HOST_MEM_CAPACITY: AttributeKey<Long> = AttributeKey.longKey("node.mem_capacity") diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt index 8e6c34d0..8e787b97 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostData.kt @@ -22,22 +22,29 @@ package org.opendc.telemetry.compute.table -import org.opendc.compute.service.driver.Host +import java.time.Instant /** * A trace entry for a particular host. */ public data class HostData( - public val timestamp: Long, - public val host: Host, - public val totalWork: Double, - public val grantedWork: Double, - public val overcommittedWork: Double, - public val interferedWork: Double, - public val cpuUsage: Double, - public val cpuDemand: Double, - public val instanceCount: Int, - public val powerDraw: Double, - public val uptime: Long, - public val downtime: Long, + val timestamp: Instant, + val host: HostInfo, + val guestsTerminated: Int, + val guestsRunning: Int, + val guestsError: Int, + val guestsInvalid: Int, + val cpuLimit: Double, + val cpuUsage: Double, + val cpuDemand: Double, + val cpuUtilization: Double, + val cpuActiveTime: Long, + val cpuIdleTime: Long, + val cpuStealTime: Long, + val cpuLostTime: Long, + val powerUsage: Double, + val powerTotal: Double, + val uptime: Long, + val downtime: Long, + val bootTime: Instant? ) diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt new file mode 100644 index 00000000..d9a5906b --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/HostInfo.kt @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute.table + +/** + * Information about a host exposed to the telemetry service. + */ +public data class HostInfo(val id: String, val name: String, val arch: String, val cpuCount: Int, val memCapacity: Long) diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt index 2a9fa8a6..c48bff3a 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerData.kt @@ -22,14 +22,22 @@ package org.opendc.telemetry.compute.table -import org.opendc.compute.api.Server +import java.time.Instant /** * A trace entry for a particular server. */ public data class ServerData( - public val timestamp: Long, - public val server: Server, - public val uptime: Long, - public val downtime: Long, + val timestamp: Instant, + val server: ServerInfo, + val host: HostInfo?, + val uptime: Long, + val downtime: Long, + val bootTime: Instant?, + val schedulingLatency: Long, + val cpuLimit: Double, + val cpuActiveTime: Long, + val cpuIdleTime: Long, + val cpuStealTime: Long, + val cpuLostTime: Long, ) diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt new file mode 100644 index 00000000..b16e5f3d --- /dev/null +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServerInfo.kt @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.telemetry.compute.table + +/** + * Static information about a server exposed to the telemetry service. + */ +public data class ServerInfo( + val id: String, + val name: String, + val type: String, + val arch: String, + val imageId: String, + val imageName: String, + val cpuCount: Int, + val memCapacity: Long +) diff --git a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt index f6ff5db5..6db1399d 100644 --- a/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt +++ b/opendc-telemetry/opendc-telemetry-compute/src/main/kotlin/org/opendc/telemetry/compute/table/ServiceData.kt @@ -22,16 +22,18 @@ package org.opendc.telemetry.compute.table +import java.time.Instant + /** * A trace entry for the compute service. */ public data class ServiceData( - public val timestamp: Long, - public val hostCount: Int, - public val activeHostCount: Int, - public val instanceCount: Int, - public val runningInstanceCount: Int, - public val finishedInstanceCount: Int, - public val queuedInstanceCount: Int, - public val failedInstanceCount: Int + val timestamp: Instant, + val hostsUp: Int, + val hostsDown: Int, + val serversPending: Int, + val serversActive: Int, + val attemptsSuccess: Int, + val attemptsFailure: Int, + val attemptsError: Int ) diff --git a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt index 9ee16fac..07f0ff7f 100644 --- a/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt +++ b/opendc-telemetry/opendc-telemetry-sdk/src/main/kotlin/org/opendc/telemetry/sdk/metrics/export/CoroutineMetricReader.kt @@ -26,14 +26,8 @@ import io.opentelemetry.sdk.metrics.data.MetricData import io.opentelemetry.sdk.metrics.export.MetricExporter import io.opentelemetry.sdk.metrics.export.MetricProducer import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.flow.consumeAsFlow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach import mu.KotlinLogging -import java.util.* -import kotlin.coroutines.resume -import kotlin.coroutines.suspendCoroutine +import java.time.Duration /** * A helper class to read the metrics from a list of [MetricProducer]s and automatically export the metrics every @@ -44,56 +38,44 @@ import kotlin.coroutines.suspendCoroutine * @param scope The [CoroutineScope] to run the reader in. * @param producers The metric producers to gather metrics from. * @param exporter The export to export the metrics to. - * @param exportInterval The export interval in milliseconds. + * @param exportInterval The export interval. */ public class CoroutineMetricReader( scope: CoroutineScope, private val producers: List<MetricProducer>, private val exporter: MetricExporter, - private val exportInterval: Long = 60_000 + private val exportInterval: Duration = Duration.ofMinutes(5) ) : AutoCloseable { private val logger = KotlinLogging.logger {} - private val chan = Channel<List<MetricData>>(Channel.RENDEZVOUS) /** - * The metric reader job. + * The background job that is responsible for collecting the metrics every cycle. */ - private val readerJob = scope.launch { + private val job = scope.launch { + val intervalMs = exportInterval.toMillis() + while (isActive) { - delay(exportInterval) + delay(intervalMs) val metrics = mutableListOf<MetricData>() for (producer in producers) { metrics.addAll(producer.collectAllMetrics()) } - chan.send(Collections.unmodifiableList(metrics)) - } - } - /** - * The exporter job runs in the background to actually export the metrics. - */ - private val exporterJob = chan.consumeAsFlow() - .onEach { metrics -> - suspendCoroutine<Unit> { cont -> - try { - val result = exporter.export(metrics) - result.whenComplete { - if (!result.isSuccess) { - logger.trace { "Exporter failed" } - } - cont.resume(Unit) + try { + val result = exporter.export(metrics) + result.whenComplete { + if (!result.isSuccess) { + logger.trace { "Exporter failed" } } - } catch (cause: Throwable) { - logger.warn(cause) { "Exporter threw an Exception" } - cont.resume(Unit) } + } catch (cause: Throwable) { + logger.warn(cause) { "Exporter threw an Exception" } } } - .launchIn(scope) + } override fun close() { - readerJob.cancel() - exporterJob.cancel() + job.cancel() } } |
